diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs index 645f9f226..aa31c9dc0 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs @@ -17,7 +17,7 @@ protected SortedMultisetAggregateBase(IComparerExpression comparer, QueryCont Expression>, SortedMultiSet>> template = (g) => new SortedMultiSet(g); var replaced = template.ReplaceParametersInBody(generator); - initialState = Expression.Lambda>>(replaced); + this.initialState = Expression.Lambda>>(replaced); } private readonly Expression>> initialState; diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs index 15ebdacff..2b07ee362 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs @@ -6,27 +6,37 @@ using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Linq.Expressions; +using Microsoft.StreamProcessing.Internal; namespace Microsoft.StreamProcessing.Aggregates { - internal sealed class TopKAggregate : SortedMultisetAggregateBase>> + internal sealed class TopKAggregate : IAggregate, List>> { private readonly Comparison compiledRankComparer; private readonly int k; - public TopKAggregate(int k, QueryContainer container) : this(k, ComparerExpression.Default, container) { } + public TopKAggregate(int k, IComparerExpression rankComparer, QueryContainer container, long hoppingWindowSize) + : this(k, rankComparer, ComparerExpression.Default, container, hoppingWindowSize) { } - public TopKAggregate(int k, IComparerExpression rankComparer, QueryContainer container) - : this(k, rankComparer, ComparerExpression.Default, container) { } - - public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpression overallComparer, QueryContainer container) - : base(ThenOrderBy(Reverse(rankComparer), overallComparer), container) + public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpression overallComparer, + QueryContainer container, long hoppingWindowSize) { Contract.Requires(rankComparer != null); Contract.Requires(overallComparer != null); Contract.Requires(k > 0); this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile(); this.k = k; + + Expression>, ITopKState>> template; + if (hoppingWindowSize > 0 && hoppingWindowSize < 1000000) + template = (g) => new HoppingTopKState(k, compiledRankComparer, (int)hoppingWindowSize, g); + else + template = (g) => new SimpleTopKState(g); + + var combinedComparer = ThenOrderBy(Reverse(rankComparer), overallComparer); + var generator = combinedComparer.CreateSortedDictionaryGenerator(container); + var replaced = template.ReplaceParametersInBody(generator); + this.initialState = Expression.Lambda>>(replaced); } private static IComparerExpression Reverse(IComparerExpression comparer) @@ -53,10 +63,11 @@ private static IComparerExpression ThenOrderBy(IComparerExpression compare return new ComparerExpression(newExpression); } - public override Expression, List>>> ComputeResult() => set => GetTopK(set); + public Expression, List>>> ComputeResult() => set => GetTopK(set); - private List> GetTopK(SortedMultiSet set) + private List> GetTopK(ITopKState state) { + var set = state.GetSortedValues(); int count = (int)Math.Min(this.k, set.TotalCount); var result = new List>(count); int nextRank = 1; @@ -82,5 +93,20 @@ private List> GetTopK(SortedMultiSet set) return result; } + + private readonly Expression>> initialState; + public Expression>> InitialState() => initialState; + + private static readonly Expression, long, T, ITopKState>> acc + = (state, timestamp, input) => state.Add(input, timestamp); + public Expression, long, T, ITopKState>> Accumulate() => acc; + + private static readonly Expression, long, T, ITopKState>> dec + = (state, timestamp, input) => state.Remove(input, timestamp); + public Expression, long, T, ITopKState>> Deaccumulate() => dec; + + private static readonly Expression, ITopKState, ITopKState>> diff + = (leftState, rightState) => leftState.RemoveAll(rightState); + public Expression, ITopKState, ITopKState>> Difference() => diff; } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs new file mode 100644 index 000000000..4313df3a0 --- /dev/null +++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKState.cs @@ -0,0 +1,230 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Microsoft.StreamProcessing.Aggregates +{ + /// + /// State used by TopK Aggregate + /// + /// + public interface ITopKState + { + /// + /// Add a single entry + /// + /// + /// + ITopKState Add(T input, long timestamp); + + /// + /// Removes the specified entry + /// + /// + /// + ITopKState Remove(T input, long timestamp); + + /// + /// Removes entries from other + /// + /// + ITopKState RemoveAll(ITopKState other); + + /// + /// Gets the values as sorted set + /// + /// + SortedMultiSet GetSortedValues(); + + /// + /// Returns total number of values in the set + /// + long Count { get; } + } + + internal class SimpleTopKState : ITopKState + { + private SortedMultiSet values; + + public SimpleTopKState(Func> generator) + { + this.values = new SortedMultiSet(generator); + } + + public long Count => this.values.TotalCount; + + public ITopKState Add(T input, long timestamp) + { + this.values.Add(input); + return this; + } + + public SortedMultiSet GetSortedValues() => this.values; + + public ITopKState Remove(T input, long timestamp) + { + this.values.Remove(input); + return this; + } + + public ITopKState RemoveAll(ITopKState other) + { + this.values.RemoveAll(other.GetSortedValues()); + return this; + } + } + + internal class HoppingTopKState : ITopKState + { + public long currentTimestamp; + + public CircularBuffer>> previousValues; + public SortedMultiSet currentValues; + + public int k; + + public Comparison rankComparer; + private Func> generator; + private ItemAndCount minValue; // The minimum threshold value in TopK + + public HoppingTopKState(int k, Comparison rankComparer, int hoppingWindowSize, Func> generator) + { + this.k = k; + this.rankComparer = rankComparer; + this.currentValues = new SortedMultiSet(generator); + this.previousValues = new CircularBuffer>>(hoppingWindowSize); + this.generator = generator; + } + + public ITopKState Add(T input, long timestamp) + { + // Verify that input is added in non-decreasing order + if (timestamp < this.currentTimestamp) + { + throw new ArgumentException("Invalid timestamp"); + } + + // First entry in new hop window, just add the value + if (timestamp > this.currentTimestamp) + { + MergeCurrentToPrevious(); + this.currentTimestamp = timestamp; + this.currentValues.Add(input); + this.minValue = new ItemAndCount(input, 1); + return this; + } + + // these are subsequent entries + int compare = rankComparer(input, this.minValue.Item); + + if (this.currentValues.TotalCount < this.k) // if we have not reached k yet, add it + { + if (compare > 0) + this.minValue = new ItemAndCount(input, 1); + else if (compare == 0) + this.minValue.Count++; + + this.currentValues.Add(input); + return this; + } + else if (compare > 0) // We have reached k and new input is smaller than minimum + { + return this; + } + else if (compare == 0) // We have reached k and new input is equal to the minimum + { + this.currentValues.Add(input); // add to get ties + this.minValue.Count++; + return this; + } + else // The new item is less than minValue, so we need to remove some entries to make place for the new entry + { + this.currentValues.Add(input); + var toRemove = this.currentValues.TotalCount - this.k; + if (toRemove >= minValue.Count) + { + this.currentValues.RemoveAll(this.minValue.Item); + this.minValue = this.currentValues.GetMinItem(); + } + return this; + } + } + + public ITopKState Remove(T input, long timestamp) + { + throw new NotImplementedException("Cannot remove single elements from this state"); + } + + public ITopKState RemoveAll(ITopKState other) + { + if (other.Count != 0) + { + if (other is HoppingTopKState otherTopK) + { + if (otherTopK.currentTimestamp > this.currentTimestamp) + { + throw new ArgumentException("Cannot remove entries with current or future timestamp"); + } + else if (otherTopK.currentTimestamp == this.currentTimestamp) + { + if (this.currentValues.TotalCount != otherTopK.currentValues.TotalCount) + throw new InvalidOperationException("Invalid removal"); + + this.currentValues.Clear(); + this.previousValues.Clear(); + } + else + { + while (this.previousValues.Count > 0) + { + var first = this.previousValues.PeekFirst(); + + if (first.Item1 > otherTopK.currentTimestamp) + { + break; + } + + if (first.Item1 == otherTopK.currentTimestamp && + first.Item2.TotalCount != otherTopK.currentValues.TotalCount) + throw new InvalidOperationException("Invalid removal"); + + this.previousValues.Dequeue(); + } + } + } + else + { + throw new InvalidOperationException("Cannot remove non-HoppingTopKState from HoppingTopKState"); + } + } + return this; + } + + // This function merges the current values to previous and is expensive + // Currently it is only called by ComputeResult + public SortedMultiSet GetSortedValues() + { + var sortedMultiset = new SortedMultiSet(generator); + + foreach (var dictItem in this.previousValues.Iterate()) + { + sortedMultiset.AddAll(dictItem.Item2); + } + sortedMultiset.AddAll(this.currentValues); + + return sortedMultiset; + } + + private void MergeCurrentToPrevious() + { + if (!this.currentValues.IsEmpty) + { + var newEntry = ValueTuple.Create(this.currentTimestamp, this.currentValues); + this.previousValues.Enqueue(ref newEntry); + this.currentValues = new SortedMultiSet(generator); + } + } + + public long Count => this.currentValues.TotalCount + this.previousValues.Iterate().Sum(e => e.Item2.TotalCount); + } +} diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs index 3df34156c..fff6bf7be 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs @@ -105,6 +105,13 @@ public T Dequeue() [EditorBrowsable(EditorBrowsableState.Never)] public bool IsEmpty() => this.head == this.tail; + /// + /// Removes alll elements from the list - do not use directly. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public void Clear() => this.head = this.tail = 0; + /// /// Currently for internal use only - do not use directly. /// @@ -260,6 +267,17 @@ public IEnumerator GetEnumerator() } } + /// + /// Currently for internal use only - do not use directly. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public void Clear() + { + this.tail = this.head = this.buffers.First; + this.head.Value.Clear(); + this.Count = 0; + } + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() => GetEnumerator(); } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs index 84ff1aeaf..412baf5a5 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs @@ -182,6 +182,25 @@ public SortedMultiSet Remove(T key) return this; } + /// + /// Remove all items matching the key + /// + /// The element that should be removed from the current object. + /// A reference to the current Sorted Multiset to allow for functional composition. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [EditorBrowsable(EditorBrowsableState.Never)] + public SortedMultiSet RemoveAll(T key) + { + this.Elements.TryGetValue(key, out long keyCount); + if (keyCount == 0) + throw new ArgumentException("Attempting to remove element not in set."); + + this.Elements.Remove(key); + + this.count -= keyCount; + return this; + } + /// /// Remove the given element from the current Sorted Multiset the given number of times. /// @@ -221,6 +240,21 @@ public SortedMultiSet RemoveAll(SortedMultiSet set) return this; } + /// + /// Remove all of the elements in the given set from the current Sorted Multiset, inluding multiplicity. + /// + /// The Sorted Multiset containing the elements that should be removed from the current object. + /// A reference to the current Sorted Multiset to allow for functional composition. + [EditorBrowsable(EditorBrowsableState.Never)] + public SortedMultiSet RemoveAll(IEnumerable> itemAndCount) + { + Contract.Requires(itemAndCount != null); + foreach (var keyAndCount in itemAndCount) + Remove(keyAndCount.Item, keyAndCount.Count); + + return this; + } + /// /// Clear the contents of the Sorted Multiset. /// @@ -267,5 +301,52 @@ public IEnumerable GetEnumerable() yield return keyAndCount.Key; } } + + /// + /// Get all elements, including multiplicity, of the Sorted Multiset in sort order. + /// + /// An object that enumerates all of the elements in the Sorted Multiset, including multiplicity. + [EditorBrowsable(EditorBrowsableState.Never)] + public IEnumerable> GetItemCountEnumerable() + { + foreach (var keyAndCount in this.Elements) + { + yield return new ItemAndCount(keyAndCount.Key, keyAndCount.Value); + } + } + + /// + /// Get the smallest element (and its number of occurances) in the collection + /// + /// The smallest element and its count + [EditorBrowsable(EditorBrowsableState.Never)] + public ItemAndCount GetMinItem() + { + var minItem = this.Elements.LastOrDefault(); + return new ItemAndCount(minItem.Key, minItem.Value); + } + } + + /// + /// Represents an Item and its count used in MultiSet + /// + /// + public struct ItemAndCount + { + internal ItemAndCount(T item, long count) + { + Item = item; + Count = count; + } + + /// + /// Item type used in Multiset + /// + public readonly T Item; + + /// + /// Count of items with same value + /// + public long Count { get; set; } } } diff --git a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs index 198521763..7ee4a74ea 100644 --- a/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs +++ b/Sources/Core/Microsoft.StreamProcessing/Windows/Window.cs @@ -162,25 +162,31 @@ public IAggregate, TValue> Max(Expression /// Computes a time-sensitive top-k aggregate using snapshot semantics based on a key selector. /// - public IAggregate, List>> TopK(Expression> orderer, int k) + public IAggregate, List>> TopK(Expression> orderer, int k) { Invariant.IsNotNull(orderer, nameof(orderer)); Invariant.IsPositive(k, nameof(k)); var orderComparer = ComparerExpression.Default.TransformInput(orderer); - var aggregate = new TopKAggregate(k, orderComparer, this.Properties.QueryContainer); + bool isHoppingWindow = !Properties.IsTumbling && Properties.IsConstantDuration && Properties.ConstantDurationLength.HasValue && + Properties.IsConstantHop && Properties.ConstantHopLength.HasValue && (Properties.ConstantDurationLength.Value % Properties.ConstantHopLength.Value) == 0; + long hoppingWindowSize = isHoppingWindow ? (Properties.ConstantDurationLength.Value / Properties.ConstantHopLength.Value) : 0; + var aggregate = new TopKAggregate(k, orderComparer, this.Properties.QueryContainer, hoppingWindowSize); return aggregate.SkipNulls().ApplyFilter(this.Filter); } /// /// Computes a time-sensitive top-k aggregate using snapshot semantics based on a key selector with the provided ordering comparer. /// - public IAggregate, List>> TopK(Expression> orderer, IComparerExpression comparer, int k) + public IAggregate, List>> TopK(Expression> orderer, IComparerExpression comparer, int k) { Invariant.IsNotNull(orderer, nameof(orderer)); Invariant.IsNotNull(comparer, nameof(comparer)); Invariant.IsPositive(k, nameof(k)); var orderComparer = comparer.TransformInput(orderer); - var aggregate = new TopKAggregate(k, orderComparer, this.Properties.QueryContainer); + bool isHoppingWindow = !Properties.IsTumbling && Properties.IsConstantDuration && Properties.ConstantDurationLength.HasValue && + Properties.IsConstantHop && Properties.ConstantHopLength.HasValue && (Properties.ConstantHopLength.Value % Properties.ConstantDurationLength.Value) == 0; + long hoppingWindowSize = isHoppingWindow ? (Properties.ConstantDurationLength.Value / Properties.ConstantHopLength.Value) : 0; + var aggregate = new TopKAggregate(k, orderComparer, this.Properties.QueryContainer, hoppingWindowSize); return aggregate.SkipNulls().ApplyFilter(this.Filter); } diff --git a/Sources/Test/SimpleTesting/Aggregates/HoppingWindowTopKAggregate.cs b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowTopKAggregate.cs new file mode 100644 index 000000000..2cfa02b91 --- /dev/null +++ b/Sources/Test/SimpleTesting/Aggregates/HoppingWindowTopKAggregate.cs @@ -0,0 +1,220 @@ +// ********************************************************************* +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License +// ********************************************************************* +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.StreamProcessing; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace SimpleTesting +{ + [TestClass] + public class HoppingWindowTopKAggregate : TestWithConfigSettingsAndMemoryLeakDetection + { + private const long HopSize = 10; + private const long WindowSize = 4 * HopSize; + private const int K = 3; + + private StreamEvent EndEvent = StreamEvent.CreatePunctuation(StreamEvent.InfinitySyncTime); + private Random random = new Random(Seed: (int)DateTime.UtcNow.Ticks); + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateSimple() + { + var input = new[] + { + StreamEvent.CreateStart(1, 10), + StreamEvent.CreateStart(3, 20), + StreamEvent.CreateStart(6, 10), + StreamEvent.CreateStart(8, 30), + StreamEvent.CreateStart(12, 20), + StreamEvent.CreateStart(18, 10), + EndEvent + }; + + var output = input.ToStreamable().HoppingWindowLifetime(10, 5).TopK(K); + var outputValues = output.Select(re => re.Select(t => t.Payload).ToArray()) + .ToStreamEventArray().Where(e => e.IsData).ToArray(); + + var expected = new[] + { + StreamEvent.CreateStart(5, new[] { 20, 10 }), + StreamEvent.CreateEnd(10, 5, new[] { 20, 10 }), + StreamEvent.CreateStart(10, new[] { 30, 20, 10, 10 }), + StreamEvent.CreateEnd(15, 10, new[] { 30, 20, 10, 10 }), + StreamEvent.CreateStart(15, new[] { 30, 20, 10 }), + StreamEvent.CreateEnd(20, 15, new[] { 30, 20, 10 }), + StreamEvent.CreateStart(20, new[] { 20, 10 }), + StreamEvent.CreateEnd(25, 20, new[] { 20, 10 }), + StreamEvent.CreateStart(25, new[] { 10 }), + StreamEvent.CreateEnd(30, 25, new[] { 10 }), + }; + + Assert.AreEqual(expected.Length, outputValues.Length); + + for (int i = 0; i < expected.Length; i++) + { + Assert.AreEqual(expected[i].StartTime, outputValues[i].StartTime); + Assert.AreEqual(expected[i].EndTime, outputValues[i].EndTime); + Assert.AreEqual(expected[i].Payload.Length, outputValues[i].Payload.Length); + for (int j = 0; j < expected[i].Payload.Length; j++) + Assert.AreEqual(expected[i].Payload[j], outputValues[i].Payload[j]); + } + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateRandomDistribution() + { + // Distribution: [1,2,3,4,5] hops : 10% each, Closely-Spaced: 50% + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => + { + var hopType = random.Next(1, 11); + return (hopType < 6) ? (hopType * HopSize) : (hopType - 5); + }); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateUnaligned() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => 10, + windowSize: 27); + + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => random.Next(100, 110), + distanceGenerator: () => 10, + windowSize: 17); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateAllIncreasing() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => v + random.Next(1, 11), + distanceGenerator: () => HopSize); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateAllDecreasing() + { + GenerateDataAndTestInput( + numValues: 1000, + valueGenerator: v => (v == 0) ? 10000 : v - random.Next(1, 11), + distanceGenerator: () => HopSize); + } + + [TestMethod, TestCategory("Gated")] + public void TestHoppingWindowTopKAggregateCyclingValues() + { + // Test values 1 -> 2 -> 3 -> 1, and run for 1/2, 1, 2, 4, 8 intervals of hops + for (long distance = HopSize / 2; distance <= HopSize * 8; distance *= 2) + { + GenerateDataAndTestInput( + numValues: 10, + valueGenerator: v => (v % 3) + 1, + distanceGenerator: () => distance); + } + } + + private void GenerateDataAndTestInput( + int numValues, + Func valueGenerator, + Func distanceGenerator, + long windowSize = WindowSize) + { + var input = new List>(); + long maxStartTime = 0; + + long startTime = 0; + int value = 100; + for (int i = 0; i < numValues; i++) + { + startTime += distanceGenerator(); + value = valueGenerator(value); + input.Add(StreamEvent.CreateStart(startTime, value)); + maxStartTime = Math.Max(maxStartTime, startTime); + } + input.Add(EndEvent); + + TestHoppingWindowTopKAggregateInternal(input, maxStartTime, windowSize); + } + + private void TestHoppingWindowTopKAggregateInternal(IEnumerable> streamEvents, long maxStartTime, long windowSize) + { + var output = streamEvents.ToStreamable().HoppingWindowLifetime(windowSize, HopSize).TopK(K); + + var correct = new List>>(); + + maxStartTime += windowSize; + for (long startTime = 0; startTime < maxStartTime; startTime += HopSize) + { + var eventsInWindow = streamEvents.Where(x => x.StartTime > (startTime - windowSize) && x.StartTime <= startTime); + + if (!eventsInWindow.Any()) + continue; + + var dataInWindow = eventsInWindow.Select(se => se.Payload).ToList(); + dataInWindow.Sort((x, y) => y.CompareTo(x)); + + var last = dataInWindow[Math.Min(K, dataInWindow.Count) - 1]; + var topKInWindow = dataInWindow.TakeWhile(e => e >= last).ToList(); + + correct.Add(StreamEvent.CreateStart(startTime, topKInWindow)); + correct.Add(StreamEvent.CreateEnd(startTime + HopSize, startTime, topKInWindow)); + } + + var output2 = output.Select(e => e.Select(re => re.Payload).ToList()); + + var expected = NormalizeToInterval(correct).ToList(); + var actual = NormalizeToInterval(output2.ToStreamEventArray()).ToList(); + + Assert.AreEqual(expected.Count, actual.Count); + + for (int i = 0; i < expected.Count; i++) + { + Assert.AreEqual(expected[i].Kind, actual[i].Kind); + Assert.AreEqual(expected[i].StartTime, actual[i].StartTime); + Assert.AreEqual(expected[i].EndTime, actual[i].EndTime); + CollectionAssert.AreEquivalent(expected[i].Payload, actual[i].Payload); + } + } + + private IEnumerable>> NormalizeToInterval(IEnumerable>> streamEvents) + { + var result = new List>>(); + + var endEvents = streamEvents.Where(se => se.Kind == StreamEventKind.End || se.Kind == StreamEventKind.Interval); + + if (!endEvents.Any()) + return result; + + var firstEvent = endEvents.First(); + StreamEvent> curInterval = StreamEvent.CreateInterval(firstEvent.StartTime, firstEvent.EndTime, firstEvent.Payload); + + foreach (var streamEvent in endEvents.Skip(1)) + { + if ((streamEvent.StartTime == curInterval.EndTime || streamEvent.Kind == StreamEventKind.Interval) && // Merge into current interval if payload is same + Enumerable.SequenceEqual(streamEvent.Payload, curInterval.Payload)) + { + curInterval.OtherTime = streamEvent.EndTime; + } + else + { + result.Add(curInterval); + curInterval = StreamEvent.CreateInterval(streamEvent.StartTime, streamEvent.EndTime, streamEvent.Payload); ; + } + } + result.Add(curInterval); + return result; + } + } +} diff --git a/Sources/Test/SimpleTesting/Program.cs b/Sources/Test/SimpleTesting/Program.cs index e1b97e1e9..80f4397f2 100644 --- a/Sources/Test/SimpleTesting/Program.cs +++ b/Sources/Test/SimpleTesting/Program.cs @@ -49,7 +49,7 @@ public static IStreamable ToCleanStreamable(this Stre return input.OrderBy(v => v.SyncTime).ToArray().ToStreamable(); } - public static IStreamable ToStreamable(this StreamEvent[] input) + public static IStreamable ToStreamable(this IEnumerable> input) { Invariant.IsNotNull(input, "input"); diff --git a/Sources/Test/SimpleTesting/SimpleTesting.csproj b/Sources/Test/SimpleTesting/SimpleTesting.csproj index aedf068f0..93c7c8e97 100644 --- a/Sources/Test/SimpleTesting/SimpleTesting.csproj +++ b/Sources/Test/SimpleTesting/SimpleTesting.csproj @@ -63,6 +63,7 @@ +