Skip to content

Commit

Permalink
Merge remote-tracking branch 'thingsboard/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ViacheslavKlimov committed Mar 19, 2021
2 parents e2b455e + 3624066 commit 4d174c3
Showing 1 changed file with 69 additions and 16 deletions.
85 changes: 69 additions & 16 deletions ui-ngx/src/app/core/api/data-aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,56 @@ interface AggData {
aggValue: any;
}

interface AggregationMap {
[key: string]: Map<number, AggData>;
class AggDataMap {
rangeChanged = false;
private minTs = Number.MAX_SAFE_INTEGER;
private map = new Map<number, AggData>();

set(ts: number, data: AggData) {
if (ts < this.minTs) {
this.rangeChanged = true;
this.minTs = ts;
}
this.map.set(ts, data);
}

get(ts: number): AggData {
return this.map.get(ts);
}

delete(ts: number) {
this.map.delete(ts);
}

forEach(callback: (value: AggData, key: number, map: Map<number, AggData>) => void, thisArg?: any) {
this.map.forEach(callback, thisArg);
}

size(): number {
return this.map.size;
}
}

class AggregationMap {
aggMap: {[key: string]: AggDataMap} = {};

detectRangeChanged(): boolean {
let changed = false;
for (const key of Object.keys(this.aggMap)) {
const aggDataMap = this.aggMap[key];
if (aggDataMap.rangeChanged) {
changed = true;
aggDataMap.rangeChanged = false;
}
}
return changed;
}

clearRangeChangedFlags() {
for (const key of Object.keys(this.aggMap)) {
this.aggMap[key].rangeChanged = false;
}
}
}

declare type AggFunction = (aggData: AggData, value?: any) => void;
Expand Down Expand Up @@ -170,20 +218,25 @@ export class DataAggregator {
updateIntervalScheduledTime = false;
}
if (update) {
this.aggregationMap = {};
this.aggregationMap = new AggregationMap();
this.updateAggregatedData(data.data);
} else {
this.aggregationMap = this.processAggregatedData(data.data);
}
if (updateIntervalScheduledTime) {
this.intervalScheduledTime = this.utils.currentPerfTime();
}
this.aggregationMap.clearRangeChangedFlags();
this.onInterval(history, detectChanges);
} else {
this.updateAggregatedData(data.data);
if (history) {
this.intervalScheduledTime = this.utils.currentPerfTime();
this.onInterval(history, detectChanges);
} else {
if (this.aggregationMap.detectRangeChanged()) {
this.onInterval(false, detectChanges, true);
}
}
}
}
Expand All @@ -203,17 +256,18 @@ export class DataAggregator {
}
}

private onInterval(history?: boolean, detectChanges?: boolean) {
private onInterval(history?: boolean, detectChanges?: boolean, rangeChanged?: boolean) {
const now = this.utils.currentPerfTime();
this.elapsed += now - this.intervalScheduledTime;
this.intervalScheduledTime = now;
if (this.intervalTimeoutHandle) {
clearTimeout(this.intervalTimeoutHandle);
this.intervalTimeoutHandle = null;
}
const intervalTimeout = rangeChanged ? this.aggregationTimeout - this.elapsed : this.aggregationTimeout;
if (!history) {
const delta = Math.floor(this.elapsed / this.subsTw.aggregation.interval);
if (delta || !this.data) {
if (delta || !this.data || rangeChanged) {
const tickTs = delta * this.subsTw.aggregation.interval;
if (this.subsTw.quickInterval) {
const currentDate = this.getCurrentTime();
Expand All @@ -234,16 +288,16 @@ export class DataAggregator {
this.updatedData = false;
}
if (!history) {
this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), this.aggregationTimeout);
this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), intervalTimeout);
}
}

private updateData(): SubscriptionData {
this.tsKeyNames.forEach((key) => {
this.dataBuffer[key] = [];
});
for (const key of Object.keys(this.aggregationMap)) {
const aggKeyData = this.aggregationMap[key];
for (const key of Object.keys(this.aggregationMap.aggMap)) {
const aggKeyData = this.aggregationMap.aggMap[key];
let keyData = this.dataBuffer[key];
aggKeyData.forEach((aggData, aggTimestamp) => {
if (aggTimestamp <= this.startTs) {
Expand Down Expand Up @@ -300,12 +354,12 @@ export class DataAggregator {

private processAggregatedData(data: SubscriptionData): AggregationMap {
const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
const aggregationMap: AggregationMap = {};
const aggregationMap = new AggregationMap();
for (const key of Object.keys(data)) {
let aggKeyData = aggregationMap[key];
let aggKeyData = aggregationMap.aggMap[key];
if (!aggKeyData) {
aggKeyData = new Map<number, AggData>();
aggregationMap[key] = aggKeyData;
aggKeyData = new AggDataMap();
aggregationMap.aggMap[key] = aggKeyData;
}
const keyData = data[key];
keyData.forEach((kvPair) => {
Expand All @@ -326,10 +380,10 @@ export class DataAggregator {
private updateAggregatedData(data: SubscriptionData) {
const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
for (const key of Object.keys(data)) {
let aggKeyData = this.aggregationMap[key];
let aggKeyData = this.aggregationMap.aggMap[key];
if (!aggKeyData) {
aggKeyData = new Map<number, AggData>();
this.aggregationMap[key] = aggKeyData;
aggKeyData = new AggDataMap();
this.aggregationMap.aggMap[key] = aggKeyData;
}
const keyData = data[key];
keyData.forEach((kvPair) => {
Expand Down Expand Up @@ -374,4 +428,3 @@ export class DataAggregator {
}

}

0 comments on commit 4d174c3

Please sign in to comment.