onsdag 21. juli 2010

ConcurrentQueueWithWait

For the time beeing, I'm working on a rather heavy computing problem at work. A normal case scenario has a computing time of about 1 hour, and a not unlikely, but bad scenario, has a computing time of about 7 hours. One "problem" is that I'm already using a pretty powerful computer:
  • Core i7 930 @ 2.80GHz (WEP 7.5)
  • 2 striped Intel 160GB X25M 2.gen Disks (WEP 7.9)
  • 12GB RAM (Operations WEP 7.5)
I've created several producers and consumers, and though I've done a lot of code optimizing I've got more to go on. My main clog on performance was the SQLite connection. SQLite is an embedded single-connection database, and I have some constraints on the interpretation of the data that forces me to write data more or less sequentially. This limits me too using one thread to execute the SQLiteCommands, and using the phx-lib, the SQLiteCommands is prepared only when execution is called. Preparing the commands is quite time consuming, so I've limited the queue of commands to avoid using more time than necessary on preparations. When a command is prepared I can change the values of the parameters without having to do the complete preparation step over again. I use the ObjectPool to generate and store the SQLiteCommands.
Have you ever worked with the Producer/Consumer pattern on several threads? If you have your code might haved looked something like this (pseudoocode):
Producer 1 and 2 (2 separate threads that I create several of)
while (work to do){
  Do some calculations on obj
  Lock (FirstLevelQueue(s)) { // one queue for both or 1 queue for each
    FirstLevelQueue(s).Enqueue(obj)
  }
}
Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate threads)
while (producer 1 and 2 not done){
  dequeued = true
  while (dequeued) {
    Lock (FirstLevelQueue(s)) { dequeued = FirstLevelQueue(s).Dequeue(obj) }
    if (dequeued) {
      Do some work on obj
      while (SecondLevelQueue(s).Count > sizeLimit) Thread.Sleep(100)
      Lock (SecondLevelQueue(s)) { SecondLevelQueue(s).Enqueue(obj) }
    } else if (some complex criterion) {
      Thread.Sleep(100) 
    }
  }
}
Consumer (5. thread)
while (producers 3 and 4 not done) {
  dequeued = true
  while (dequeued) {
    Lock (SecondLevelQueue(s)) { dequeued = SecondLevelQueue(s).Dequeue(obj) }
    if (dequeued) {
      Do some work
    } else if (some complex criterion) {
      Thread.Sleep(100) 
    }
  }
}
I feel this code has a few weak points. First of all, locking is newer good since it takes time and it hinders consumers to dequeue for as long as anyone is queueing. Also in this code there is going to be a lot locks since all access uses this mechanism. Locking can also cause huge problems if you forget one or end up locking for too long, e.g. while sleeping. Second, the sleep statements are going to take a lot of time. In the code, on which I am working, I discovered that even a sleep as short as 10ms was way too long. Furthermore, since the sleep is so short the threads are going to wake up and consume quite a bit of resources while checking if there is any more work to do. Using .Net 4.0 and TPL(Task Parallell Library) you get a somewhat better and nicer code by using the ConcurrentQueue, which is thread-safe:
Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
  Do some calculations on obj
  FirstLevelConcurrentQueue(s).Enqueue(obj) // one queue for both or 1 queue for each
}

Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
  dequeued = true
  while (dequeued) {
    dequeued = FirstLevelConcurrentQueue(s).TryDequeue(obj)
    if (dequeued) {
      Do some work on obj
      while (SecondLevelConcurrentQueue(s).Count > sizeLimit) Thread.Sleep(100)
      SecondLevelConcurrentQueue(s).Enqueue(obj)
    } else if (some complex criterion) {
      Thread.Sleep(100) 
    }
  }
}

Consumer(5. task)
while (producers 3 and 4 not done) {
  dequeued = true
  while (dequeued) {
    dequeued = SecondLevelConcurrentQueue(s).TryDequeue(obj)
    if (dequeued) {
      Do some work
    } else if (some complex criterion) {      Thread.Sleep(100) 
    }
  }
}
Now, this code looks a lot better since I'm rid of the locks. This reduces some common error points and the consumers do not stop the producers from enqueueing. But there is still a lot of sleep statements, and the threads often wake up to check if there is more work to do, and as stated, for my part, even a 10ms sleep were to long.
I decided that I really wanted code that looked somewhat more like this:
Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
  Do some calculations on obj
  FirstLevelConcurrentQueue(s).EnqueueWithEvent(obj) // one queue for both or 1 queue for each
}

Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
  FirstLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
  Do some work on obj
  SecondLevelConcurrentQueue(s).EnqueueUpToLimit(obj)
}

Consumer(5. task)
while (producers 3 and 4 not done) {
  SecondLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)
  Do some work
}
In my mind, this code is shorter and way more readable. In addition, the threads do not wake up all the time to check if there is some work to do. This actually makes the code more cpu efficient and gives you more cpu resources to do your other computing.
It didn't take me much googling to discover the AutoResetEvent, an easy-to-use, thread-safe event mechanism. Using the AutoResetEvent and the ConcurrentQueue I implemented a class I called ConcurrentQueueWithWait. I included this in the dll for ParallellExtensionsExtras, which we use at work, and after some tweaking and testing I ended up with this class:
public class ConcurrentQueueWithWait : ConcurrentQueue {
  private AutoResetEvent _enqueuedEvent = new AutoResetEvent(false);
  private AutoResetEvent _dequeuedEvent = new AutoResetEvent(false);

  /// The size limit of the ConcurrentQueue, if you use EnqueueUpToLimit for enqueueing
  public int QueueLimit { get; set; }

  /// The timeout for Dequeue-events, and EnqueueUpToLimit-events
  public int TimeOut { get { return _timeOut; } set { _timeOut = value; } }
  private int _timeOut = 5000;

  /// Sets both the Enqueue and Dequeue events twice to try to avoid any deadlocks
  public void ProducerFinished() {
    _enqueuedEvent.Set();
    _dequeuedEvent.Set();
    Thread.Sleep(10);
    _enqueuedEvent.Set();
    _dequeuedEvent.Set();
  }

  /// Enqueues the item if the queue limit is not reached. If limit is reached it will wait until an item is 
  /// dequeued, and then enqueue.
  /// Note: on a timeout it will enqueue wether or not the limit has been reached.
  public void EnqueueUpToLimit(T item) {
    if (QueueLimit > 0 && base.Count > QueueLimit)
      _dequeuedEvent.WaitOne(TimeOut);
    base.Enqueue(item);
    _enqueuedEvent.Set();
  }

  /// Adds an object to the end of the ConcurrentQueue, and sends an event to anyone waiting to dequeue.
  public void EnqueueWithEvent(T item) {
    base.Enqueue(item);
    _enqueuedEvent.Set();
  }

  /// Attempts to remove and return the object at the beginning of the queue. If none is found it will wait for 
  /// an enqueued signal, or timeout.
  /// Should be used as a regular TryDequeue, but without the while(...) Thread.Sleep(xx)
  /// Returns wether or not a dequeue was successful
  public bool TryWaitUntilDequeued(out T result) {
    bool res = true;
    if (base.IsEmpty)
      res = _enqueuedEvent.WaitOne(TimeOut);
    if (!res) {
      result = default(T);
      return false;
    }
    res = TryDequeue(out result);
    _dequeuedEvent.Set(); //I do not need to check result here, since a false should equal an empty queue 
                                        //and therefore any pending enqueues should be fired
    return res;
  }

  /// Attempts to return the object at the beginning of the queue. If none is found it will wait for an 
  /// enqueued signal, or timeout.
  /// Should be used as a regular TryPeek, but without the while(...) Thread.Sleep(xx)
  /// Returns wether or not a peek was successful
  public bool TryWaitUntilPeeked(out T result) {
    bool res = true;
    if (base.IsEmpty)
      res = _enqueuedEvent.WaitOne(TimeOut);
    if (!res) {
      result = default(T);
      return false;
    }
    res = TryPeek(out result);
    return res;
  }
}
I had to put in a timeout to avoid deadlocks both on enqueue and dequeue. In tests, which I've run on a small test-set(2 seconds run-time), I get the extra 5 seconds in about 1 out of 6 runthroughs, and I figure that 5 seconds extra don't matter much with run-times normally from 20 minutes and up.

This class meant that I ended up with this implementation of my tasks:
Producer 1 and 2 (2 separate tasks that I create several of)
while (work to do){
  Do some calculations on obj
  FirstLevelConcurrentQueue(s).EnqueueWithEvent(obj) // one queue for both or 1 queue for each
}

Producer 3 and 4, that Consumes 1 and 2's queue(s)(2 separate tasks)
while (producer 1 and 2 not done){
  while (FirstLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)) {
    Do some work on obj
    SecondLevelConcurrentQueue(s).EnqueueUpToLimit(obj)
  }
}

Consumer(5th task)
while (producers 3 and 4 not done) {
  while (SecondLevelConcurrentQueue(s).TryWaitUntilDequeued(obj)) {
    Do some work
  }
}
Not quite the easy and nice code I was hoping for, but really close enough to make me happy. My threads now only come out of sleep every 5 seconds or when there actually is some work to do.
Some minor problems with this implementation:
  • If I get a timeout on EnqueueUpToLimit I've set it to just enqueue anyway; so the limit I set on the queue is not absolute. My tests show that this is not a problem for my program, so I'm pleased with it.
  • I do get the timeout as an extra runtime in about 1 out of 6 times.
  • I still have to use the try-pattern.
  • If a bug finishes the 5th task before the 3rd and 4th, my program gets an extreme runtime since the 3rd and 4th constantly waits for the enqueue-timeout. This appeared to be a Heisenbug when I encountered it.
  • There is still some plumbing code needed, but way less than before.