-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TopK optimization to store not more than k values per window #121
base: master
Are you sure you want to change the base?
TopK optimization to store not more than k values per window #121
Conversation
Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
Outdated
Show resolved
Hide resolved
Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
Outdated
Show resolved
Hide resolved
Sources/Test/SimpleTesting/Aggregates/HoppingWindowTopKAggregate.cs
Outdated
Show resolved
Hide resolved
Sources/Test/SimpleTesting/Aggregates/HoppingWindowTopKAggregate.cs
Outdated
Show resolved
Hide resolved
Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs
Outdated
Show resolved
Hide resolved
var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer); | ||
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container); | ||
var replaced = template.ReplaceParametersInBody(generator); | ||
initialState = Expression.Lambda<Func<ITopKState<T>>>(replaced); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialState [](start = 12, length = 12)
Did you disable the stylecop rules? instance members should be prefixed by "this.". Please make sure to follow style rules.
|
||
var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer); | ||
var generator = combinedComparer.CreateSortedDictionaryGenerator<T, long>(container); | ||
var replaced = template.ReplaceParametersInBody(generator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document what is going on here...
Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
Outdated
Show resolved
Hide resolved
long Count { get; } | ||
} | ||
|
||
internal class SimpleTopKState<T> : ITopKState<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleTopKState [](start = 19, length = 15)
Shouldn't this also be capped t the top K ranks rather than storing everything?
} | ||
|
||
public void Remove(T input, long timestamp) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use expression bodies for small one-liners
/// State used by TopK Aggregate | ||
/// </summary> | ||
/// <typeparam name="T"></typeparam> | ||
public interface ITopKState<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ITopKState [](start = 21, length = 10)
This will wrap every aggregation operation a virtual function call. I wonder if it would be better to consolidate them with a single class and differentiate behavior at runtime like the Min/Max state, especially if we optimize the simple top k state as well so it will also have a comparer, etc.
this.k = k; | ||
this.rankComparer = rankComparer; | ||
this.currentValues = new SortedMultiSet<T>(generator); | ||
this.previousValues = new SortedMultiSet<T>(generator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previousValues [](start = 17, length = 14)
Are there cases where this is never needed? perhaps lazy init?
throw new ArgumentException("Cannot remove entries with current or future timestamp"); | ||
} | ||
previousValues.RemoveAll(otherTopK.currentValues); | ||
previousValues.RemoveAll(otherTopK.previousValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you only handle the case where otherTopK.currentTimestamp > currentTimestamp, but allow otherTopK.currentTimestamp == currentTimestamp
return currentValues; | ||
else | ||
{ | ||
MergeCurrentToPrevious(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MergeCurrentToPrevious [](start = 16, length = 22)
This seems to modify state significantly... please validate or at the very least comment why this isn't a problem.
if (!currentValues.IsEmpty) | ||
{ | ||
// Swap so we merge small onto larger | ||
if (previousValues.UniqueCount < currentValues.UniqueCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (previousValues.UniqueCount < currentValues.UniqueCount) [](start = 16, length = 59)
When would this happen? Doesn't seem likely or significant enough to warrant the optimization #Closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ | ||
currentValues.AddAll(otherTopK.currentValues); | ||
while (currentValues.TotalCount > k) | ||
currentValues.Remove(currentValues.First()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain why previousValues is ignored here...
Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs
Outdated
Show resolved
Hide resolved
Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs
Outdated
Show resolved
Hide resolved
Please make sure to honor all style cop (Roslyn analyzers) and VS style guidelines (should be red lines in your environment). If you are using VS and not seeing them let's figure out why. If not please find a way to enforce them locally. |
I realize the old code could use better documentation but the new code you are adding is not trivial and needs to be properly commented/documented. Please take time to comment thoroughly, and improve existing code when appropriate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🕐
5f60996
to
8e069d7
Compare
8e069d7
to
74fc7b1
Compare
No description provided.