-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncProducerConsumerQueue.cs
82 lines (70 loc) · 2.47 KB
/
AsyncProducerConsumerQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace ThreadsAndProblems.ProducerConsumer
{
//More information about 'BlockingCollection' https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/blockingcollection-overview
public class AsyncProducerConsumerQueue<T> : IDisposable
{
private readonly Action<T> m_consumer;
private readonly BlockingCollection<T> m_queue;
private readonly CancellationTokenSource m_cancelTokenSrc;
public AsyncProducerConsumerQueue(Action<T> consumer)
{
m_consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
m_queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
m_cancelTokenSrc = new CancellationTokenSource();
for (int i = 0; i < 5; i++)
Task.Factory.StartNew(() => ConsumeLoop(m_cancelTokenSrc.Token));
//new Thread(() => ConsumeLoop(m_cancelTokenSrc.Token)).Start();
}
public void Produce(T value)
{
m_queue.Add(value);
}
private void ConsumeLoop(CancellationToken cancelToken)
{
while (!cancelToken.IsCancellationRequested)
{
try
{
var item = m_queue.Take(cancelToken);
Console.WriteLine("=====Thread #{1} Consuming item: {0} ===== ", item, Thread.CurrentThread.ManagedThreadId);
m_consumer(item);
Console.WriteLine("===== Finished Consuming =====");
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
}
}
#region IDisposable
private bool m_isDisposed;
protected virtual void Dispose(bool disposing)
{
if (!m_isDisposed)
{
if (disposing)
{
m_cancelTokenSrc.Cancel();
m_cancelTokenSrc.Dispose();
m_queue.Dispose();
}
m_isDisposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}