Skip to content

Commit

Permalink
TopK optimization to not keep all values, instead have max of k value…
Browse files Browse the repository at this point in the history
…s per window
  • Loading branch information
arunkm committed Dec 21, 2019
1 parent f409a32 commit 74fc7b1
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ protected SortedMultisetAggregateBase(IComparerExpression<T> comparer, QueryCont
Expression<Func<Func<SortedDictionary<T, long>>, SortedMultiSet<T>>> template
= (g) => new SortedMultiSet<T>(g);
var replaced = template.ReplaceParametersInBody(generator);
initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
this.initialState = Expression.Lambda<Func<SortedMultiSet<T>>>(replaced);
}

private readonly Expression<Func<SortedMultiSet<T>>> initialState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,37 @@
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Linq.Expressions;
using Microsoft.StreamProcessing.Internal;

namespace Microsoft.StreamProcessing.Aggregates
{
internal sealed class TopKAggregate<T> : SortedMultisetAggregateBase<T, List<RankedEvent<T>>>
internal sealed class TopKAggregate<T> : IAggregate<T, ITopKState<T>, List<RankedEvent<T>>>
{
private readonly Comparison<T> compiledRankComparer;
private readonly int k;

public TopKAggregate(int k, QueryContainer container) : this(k, ComparerExpression<T>.Default, container) { }
public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container, long hoppingWindowSize)
: this(k, rankComparer, ComparerExpression<T>.Default, container, hoppingWindowSize) { }

public TopKAggregate(int k, IComparerExpression<T> rankComparer, QueryContainer container)
: this(k, rankComparer, ComparerExpression<T>.Default, container) { }

public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer, QueryContainer container)
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
public TopKAggregate(int k, IComparerExpression<T> rankComparer, IComparerExpression<T> overallComparer,
QueryContainer container, long hoppingWindowSize)
{
Contract.Requires(rankComparer != null);
Contract.Requires(overallComparer != null);
Contract.Requires(k > 0);
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
this.k = k;

Expression<Func<Func<SortedDictionary<T, long>>, ITopKState<T>>> template;
if (hoppingWindowSize > 0 && hoppingWindowSize < 1000000)
template = (g) => new HoppingTopKState<T>(k, compiledRankComparer, (int)hoppingWindowSize, g);
else
template = (g) => new SimpleTopKState<T>(g);

var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer);
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container);
var replaced = template.ReplaceParametersInBody(generator);
this.initialState = Expression.Lambda<Func<ITopKState<T>>>(replaced);
}

private static IComparerExpression<T> Reverse(IComparerExpression<T> comparer)
Expand All @@ -53,10 +63,11 @@ private static IComparerExpression<T> ThenOrderBy(IComparerExpression<T> compare
return new ComparerExpression<T>(newExpression);
}

public override Expression<Func<SortedMultiSet<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);
public Expression<Func<ITopKState<T>, List<RankedEvent<T>>>> ComputeResult() => set => GetTopK(set);

private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)
private List<RankedEvent<T>> GetTopK(ITopKState<T> state)
{
var set = state.GetSortedValues();
int count = (int)Math.Min(this.k, set.TotalCount);
var result = new List<RankedEvent<T>>(count);
int nextRank = 1;
Expand All @@ -82,5 +93,20 @@ private List<RankedEvent<T>> GetTopK(SortedMultiSet<T> set)

return result;
}

private readonly Expression<Func<ITopKState<T>>> initialState;
public Expression<Func<ITopKState<T>>> InitialState() => initialState;

private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> acc
= (state, timestamp, input) => state.Add(input, timestamp);
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Accumulate() => acc;

private static readonly Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> dec
= (state, timestamp, input) => state.Remove(input, timestamp);
public Expression<Func<ITopKState<T>, long, T, ITopKState<T>>> Deaccumulate() => dec;

private static readonly Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> diff
= (leftState, rightState) => leftState.RemoveAll(rightState);
public Expression<Func<ITopKState<T>, ITopKState<T>, ITopKState<T>>> Difference() => diff;
}
}
230 changes: 230 additions & 0 deletions Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace Microsoft.StreamProcessing.Aggregates
{
/// <summary>
/// State used by TopK Aggregate
/// </summary>
/// <typeparam name="T"></typeparam>
public interface ITopKState<T>
{
/// <summary>
/// Add a single entry
/// </summary>
/// <param name="input"></param>
/// <param name="timestamp"></param>
ITopKState<T> Add(T input, long timestamp);

/// <summary>
/// Removes the specified entry
/// </summary>
/// <param name="input"></param>
/// <param name="timestamp"></param>
ITopKState<T> Remove(T input, long timestamp);

/// <summary>
/// Removes entries from other
/// </summary>
/// <param name="other"></param>
ITopKState<T> RemoveAll(ITopKState<T> other);

/// <summary>
/// Gets the values as sorted set
/// </summary>
/// <returns></returns>
SortedMultiSet<T> GetSortedValues();

/// <summary>
/// Returns total number of values in the set
/// </summary>
long Count { get; }
}

internal class SimpleTopKState<T> : ITopKState<T>
{
private SortedMultiSet<T> values;

public SimpleTopKState(Func<SortedDictionary<T, long>> generator)
{
this.values = new SortedMultiSet<T>(generator);
}

public long Count => this.values.TotalCount;

public ITopKState<T> Add(T input, long timestamp)
{
this.values.Add(input);
return this;
}

public SortedMultiSet<T> GetSortedValues() => this.values;

public ITopKState<T> Remove(T input, long timestamp)
{
this.values.Remove(input);
return this;
}

public ITopKState<T> RemoveAll(ITopKState<T> other)
{
this.values.RemoveAll(other.GetSortedValues());
return this;
}
}

internal class HoppingTopKState<T> : ITopKState<T>
{
public long currentTimestamp;

public CircularBuffer<ValueTuple<long, SortedMultiSet<T>>> previousValues;
public SortedMultiSet<T> currentValues;

public int k;

public Comparison<T> rankComparer;
private Func<SortedDictionary<T, long>> generator;
private ItemAndCount<T> minValue; // The minimum threshold value in TopK

public HoppingTopKState(int k, Comparison<T> rankComparer, int hoppingWindowSize, Func<SortedDictionary<T, long>> generator)
{
this.k = k;
this.rankComparer = rankComparer;
this.currentValues = new SortedMultiSet<T>(generator);
this.previousValues = new CircularBuffer<ValueTuple<long, SortedMultiSet<T>>>(hoppingWindowSize);
this.generator = generator;
}

public ITopKState<T> Add(T input, long timestamp)
{
// Verify that input is added in non-decreasing order
if (timestamp < this.currentTimestamp)
{
throw new ArgumentException("Invalid timestamp");
}

// First entry in new hop window, just add the value
if (timestamp > this.currentTimestamp)
{
MergeCurrentToPrevious();
this.currentTimestamp = timestamp;
this.currentValues.Add(input);
this.minValue = new ItemAndCount<T>(input, 1);
return this;
}

// these are subsequent entries
int compare = rankComparer(input, this.minValue.Item);

if (this.currentValues.TotalCount < this.k) // if we have not reached k yet, add it
{
if (compare > 0)
this.minValue = new ItemAndCount<T>(input, 1);
else if (compare == 0)
this.minValue.Count++;

this.currentValues.Add(input);
return this;
}
else if (compare > 0) // We have reached k and new input is smaller than minimum
{
return this;
}
else if (compare == 0) // We have reached k and new input is equal to the minimum
{
this.currentValues.Add(input); // add to get ties
this.minValue.Count++;
return this;
}
else // The new item is less than minValue, so we need to remove some entries to make place for the new entry
{
this.currentValues.Add(input);
var toRemove = this.currentValues.TotalCount - this.k;
if (toRemove >= minValue.Count)
{
this.currentValues.RemoveAll(this.minValue.Item);
this.minValue = this.currentValues.GetMinItem();
}
return this;
}
}

public ITopKState<T> Remove(T input, long timestamp)
{
throw new NotImplementedException("Cannot remove single elements from this state");
}

public ITopKState<T> RemoveAll(ITopKState<T> other)
{
if (other.Count != 0)
{
if (other is HoppingTopKState<T> otherTopK)
{
if (otherTopK.currentTimestamp > this.currentTimestamp)
{
throw new ArgumentException("Cannot remove entries with current or future timestamp");
}
else if (otherTopK.currentTimestamp == this.currentTimestamp)
{
if (this.currentValues.TotalCount != otherTopK.currentValues.TotalCount)
throw new InvalidOperationException("Invalid removal");

this.currentValues.Clear();
this.previousValues.Clear();
}
else
{
while (this.previousValues.Count > 0)
{
var first = this.previousValues.PeekFirst();

if (first.Item1 > otherTopK.currentTimestamp)
{
break;
}

if (first.Item1 == otherTopK.currentTimestamp &&
first.Item2.TotalCount != otherTopK.currentValues.TotalCount)
throw new InvalidOperationException("Invalid removal");

this.previousValues.Dequeue();
}
}
}
else
{
throw new InvalidOperationException("Cannot remove non-HoppingTopKState from HoppingTopKState");
}
}
return this;
}

// This function merges the current values to previous and is expensive
// Currently it is only called by ComputeResult
public SortedMultiSet<T> GetSortedValues()
{
var sortedMultiset = new SortedMultiSet<T>(generator);

foreach (var dictItem in this.previousValues.Iterate())
{
sortedMultiset.AddAll(dictItem.Item2);
}
sortedMultiset.AddAll(this.currentValues);

return sortedMultiset;
}

private void MergeCurrentToPrevious()
{
if (!this.currentValues.IsEmpty)
{
var newEntry = ValueTuple.Create(this.currentTimestamp, this.currentValues);
this.previousValues.Enqueue(ref newEntry);
this.currentValues = new SortedMultiSet<T>(generator);
}
}

public long Count => this.currentValues.TotalCount + this.previousValues.Iterate().Sum(e => e.Item2.TotalCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public T Dequeue()
[EditorBrowsable(EditorBrowsableState.Never)]
public bool IsEmpty() => this.head == this.tail;

/// <summary>
/// Removes alll elements from the list - do not use directly.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
[EditorBrowsable(EditorBrowsableState.Never)]
public void Clear() => this.head = this.tail = 0;

/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
Expand Down Expand Up @@ -260,6 +267,17 @@ public IEnumerator<T> GetEnumerator()
}
}

/// <summary>
/// Currently for internal use only - do not use directly.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public void Clear()
{
this.tail = this.head = this.buffers.First;
this.head.Value.Clear();
this.Count = 0;
}

System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator();
}
}
Loading

0 comments on commit 74fc7b1

Please sign in to comment.