- Core i7 930 @ 2.80GHz (WEP 7.5)
- 2 striped Intel 160GB X25M 2.gen Disks (WEP 7.9)
- 12GB RAM (Operations WEP 7.5)
Have you ever worked with the Producer/Consumer pattern on several threads? If you have your code might haved looked something like this (pseudoocode):
Producer 1 and 2 (2 separate threads that I create several of)
while (work to do){
Do some calculations on obj
Lock (FirstLevelQueue(s)) { // one queue for both or 1 queue for each
FirstLevelQueue(s).Enqueue(obj)
}
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate threads)
while (producer 1 and 2 not done){
dequeued = true
while (dequeued) {
Lock (FirstLevelQueue(s)) { dequeued = FirstLevelQueue(s).Dequeue(obj) }
if (dequeued) {
Do some work on obj
while (SecondLevelQueue(s).Count > sizeLimit) Thread.Sleep(100)
Lock (SecondLevelQueue(s)) { SecondLevelQueue(s).Enqueue(obj) }
} else if (some complex criterion) {
Thread.Sleep(100)
}
Thread.Sleep(100)
}
}
}
Consumer (5. thread)
while (producers 3 and 4 not done) {
dequeued = true
while (dequeued) {
Lock (SecondLevelQueue(s)) { dequeued = SecondLevelQueue(s).Dequeue(obj) }
if (dequeued) {
Do some work
} else if (some complex criterion) {
Thread.Sleep(100)
}
Thread.Sleep(100)
}
}
}
I feel this code has a few weak points. First of all, locking is newer good since it takes time and it hinders consumers to dequeue for as long as anyone is queueing. Also in this code there is going to be a lot locks since all access uses this mechanism. Locking can also cause huge problems if you forget one or end up locking for too long, e.g. while sleeping. Second, the sleep statements are going to take a lot of time. In the code, on which I am working, I discovered that even a sleep as short as 10ms was way too long. Furthermore, since the sleep is so short the threads are going to wake up and consume quite a bit of resources while checking if there is any more work to do. Using .Net 4.0 and TPL(Task Parallell Library) you get a somewhat better and nicer code by using the ConcurrentQueue, which is thread-safe:Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
Do some calculations on obj
FirstLevelConcurrentQueue(s).Enqueue(obj) // one queue for both or 1 queue for each
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
dequeued = true
while (dequeued) {
dequeued = FirstLevelConcurrentQueue(s).TryDequeue(obj)
if (dequeued) {
Do some work on obj
while (SecondLevelConcurrentQueue(s).Count > sizeLimit) Thread.Sleep(100)
SecondLevelConcurrentQueue(s).Enqueue(obj)
} else if (some complex criterion) {
Thread.Sleep(100)
}
Thread.Sleep(100)
}
}
}
Consumer(5. task)
while (producers 3 and 4 not done) {
dequeued = true
while (dequeued) {
dequeued = SecondLevelConcurrentQueue(s).TryDequeue(obj)
if (dequeued) {
Do some work
} else if (some complex criterion) { Thread.Sleep(100)
}
}
}
}
Now, this code looks a lot better since I'm rid of the locks. This reduces some common error points and the consumers do not stop the producers from enqueueing. But there is still a lot of sleep statements, and the threads often wake up to check if there is more work to do, and as stated, for my part, even a 10ms sleep were to long.I decided that I really wanted code that looked somewhat more like this:
Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
Do some calculations on obj
FirstLevelConcurrentQueue(s).EnqueueWithEvent(obj) // one queue for both or 1 queue for each
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
FirstLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
Do some work on obj
SecondLevelConcurrentQueue(s).EnqueueUpToLimit(obj)
}
Consumer(5. task)
while (producers 3 and 4 not done) {
SecondLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
Do some work
}
In my mind, this code is shorter and way more readable. In addition, the threads do not wake up all the time to check if there is some work to do. This actually makes the code more cpu efficient and gives you more cpu resources to do your other computing.while (work to do){
Do some calculations on obj
FirstLevelConcurrentQueue(s).EnqueueWithEvent(obj) // one queue for both or 1 queue for each
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
FirstLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
Do some work on obj
SecondLevelConcurrentQueue(s).EnqueueUpToLimit(obj)
}
Consumer(5. task)
while (producers 3 and 4 not done) {
SecondLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
Do some work
}
It didn't take me much googling to discover the AutoResetEvent, an easy-to-use, thread-safe event mechanism. Using the AutoResetEvent and the ConcurrentQueue I implemented a class I called ConcurrentQueueWithWait. I included this in the dll for ParallellExtensionsExtras, which we use at work, and after some tweaking and testing I ended up with this class:
This class meant that I ended up with this implementation of my tasks:public class ConcurrentQueueWithWait : ConcurrentQueue {
private AutoResetEvent _enqueuedEvent = new AutoResetEvent(false);
private AutoResetEvent _dequeuedEvent = new AutoResetEvent(false);
/// The size limit of the ConcurrentQueue, if you use EnqueueUpToLimit for enqueueing
public int QueueLimit { get; set; }
/// The timeout for Dequeue-events, and EnqueueUpToLimit-events
public int TimeOut { get { return _timeOut; } set { _timeOut = value; } }
private int _timeOut = 5000;
/// Sets both the Enqueue and Dequeue events twice to try to avoid any deadlocks
public void ProducerFinished() {
_enqueuedEvent.Set();
_dequeuedEvent.Set();
Thread.Sleep(10);
_enqueuedEvent.Set();
_dequeuedEvent.Set();
}
/// Enqueues the item if the queue limit is not reached. If limit is reached it will wait until an item is
private AutoResetEvent _enqueuedEvent = new AutoResetEvent(false);
private AutoResetEvent _dequeuedEvent = new AutoResetEvent(false);
/// The size limit of the ConcurrentQueue, if you use EnqueueUpToLimit for enqueueing
public int QueueLimit { get; set; }
/// The timeout for Dequeue-events, and EnqueueUpToLimit-events
public int TimeOut { get { return _timeOut; } set { _timeOut = value; } }
private int _timeOut = 5000;
/// Sets both the Enqueue and Dequeue events twice to try to avoid any deadlocks
public void ProducerFinished() {
_enqueuedEvent.Set();
_dequeuedEvent.Set();
Thread.Sleep(10);
_enqueuedEvent.Set();
_dequeuedEvent.Set();
}
/// Enqueues the item if the queue limit is not reached. If limit is reached it will wait until an item is
/// dequeued, and then enqueue.
/// Note: on a timeout it will enqueue wether or not the limit has been reached.
public void EnqueueUpToLimit(T item) {
if (QueueLimit > 0 && base.Count > QueueLimit)
_dequeuedEvent.WaitOne(TimeOut);
base.Enqueue(item);
_enqueuedEvent.Set();
}
/// Adds an object to the end of the ConcurrentQueue, and sends an event to anyone waiting to dequeue.
public void EnqueueWithEvent(T item) {
base.Enqueue(item);
_enqueuedEvent.Set();
}
/// Attempts to remove and return the object at the beginning of the queue. If none is found it will wait for
/// Note: on a timeout it will enqueue wether or not the limit has been reached.
public void EnqueueUpToLimit(T item) {
if (QueueLimit > 0 && base.Count > QueueLimit)
_dequeuedEvent.WaitOne(TimeOut);
base.Enqueue(item);
_enqueuedEvent.Set();
}
/// Adds an object to the end of the ConcurrentQueue, and sends an event to anyone waiting to dequeue.
public void EnqueueWithEvent(T item) {
base.Enqueue(item);
_enqueuedEvent.Set();
}
/// Attempts to remove and return the object at the beginning of the queue. If none is found it will wait for
/// an enqueued signal, or timeout.
/// Should be used as a regular TryDequeue, but without the while(...) Thread.Sleep(xx)
/// Returns wether or not a dequeue was successful
public bool TryWaitUntilDequeued(out T result) {
bool res = true;
if (base.IsEmpty)
res = _enqueuedEvent.WaitOne(TimeOut);
if (!res) {
result = default(T);
return false;
}
res = TryDequeue(out result);
_dequeuedEvent.Set(); //I do not need to check result here, since a false should equal an empty queue
/// Should be used as a regular TryDequeue, but without the while(...) Thread.Sleep(xx)
/// Returns wether or not a dequeue was successful
public bool TryWaitUntilDequeued(out T result) {
bool res = true;
if (base.IsEmpty)
res = _enqueuedEvent.WaitOne(TimeOut);
if (!res) {
result = default(T);
return false;
}
res = TryDequeue(out result);
_dequeuedEvent.Set(); //I do not need to check result here, since a false should equal an empty queue
//and therefore any pending enqueues should be fired
return res;
}
/// Attempts to return the object at the beginning of the queue. If none is found it will wait for an
return res;
}
/// Attempts to return the object at the beginning of the queue. If none is found it will wait for an
/// enqueued signal, or timeout.
/// Should be used as a regular TryPeek, but without the while(...) Thread.Sleep(xx)
/// Returns wether or not a peek was successful
public bool TryWaitUntilPeeked(out T result) {
bool res = true;
if (base.IsEmpty)
res = _enqueuedEvent.WaitOne(TimeOut);
if (!res) {
result = default(T);
return false;
}
res = TryPeek(out result);
return res;
}
}
I had to put in a timeout to avoid deadlocks both on enqueue and dequeue. In tests, which I've run on a small test-set(2 seconds run-time), I get the extra 5 seconds in about 1 out of 6 runthroughs, and I figure that 5 seconds extra don't matter much with run-times normally from 20 minutes and up./// Should be used as a regular TryPeek, but without the while(...) Thread.Sleep(xx)
/// Returns wether or not a peek was successful
public bool TryWaitUntilPeeked(out T result) {
bool res = true;
if (base.IsEmpty)
res = _enqueuedEvent.WaitOne(TimeOut);
if (!res) {
result = default(T);
return false;
}
res = TryPeek(out result);
return res;
}
}
Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
Do some calculations on obj
FirstLevelConcurrentQueue(s).EnqueueWithEvent(obj) // one queue for both or 1 queue for each
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
while (FirstLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)) {
Do some work on obj
SecondLevelConcurrentQueue(s).EnqueueUpToLimit(obj)
}
}
Consumer(5th task)
while (producers 3 and 4 not done) {
while (SecondLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)) {
Do some work
}
}
Not quite the easy and nice code I was hoping for, but really close enough to make me happy. My threads now only come out of sleep every 5 seconds or when there actually is some work to do.Some minor problems with this implementation:
- If I get a timeout on EnqueueUpToLimit I've set it to just enqueue anyway; so the limit I set on the queue is not absolute. My tests show that this is not a problem for my program, so I'm pleased with it.
- I do get the timeout as an extra runtime in about 1 out of 6 times.
- I still have to use the try-pattern.
- If a bug finishes the 5th task before the 3rd and 4th, my program gets an extreme runtime since the 3rd and 4th constantly waits for the enqueue-timeout. This appeared to be a Heisenbug when I encountered it.
- There is still some plumbing code needed, but way less than before.