Before I continue exploring the heap implementation, I wanted to give some examples of how heaps are used. I always find it easier to understand a data structure if I can think of it in real world terms. How is it used?
Sequencing output
Back in July I wrote a two part series about turning traditional input, process, output (IPO) applications into producer-consumer applications using multiple threads. The first part explored the issues and the second part presented code that compares different approaches to the problem. I had planned a third part but realized that in order to complete it I needed a priority queue. I could have, and perhaps should have, used a SortedList for my priority queue, but I didn’t. Instead, I put off finishing that series until I could talk about priority queues.
To review, the program has an input thread, a processing thread, and an output thread. The input thread reads a file and places lines on a queue. The processing thread reads individual lines from the input queue, processes them, and places the result on the output queue. The output thread reads lines from the output queue and writes them to a file.
My plan was to add a second processing thread to show how you would speed the program up if the per-line processing took a long time. But if I did that, then lines could go into the output queue in the wrong order and the result would be a scrambled file.
Let me illustrate the problem with integers. The program below starts two processing threads, each of which reads the input queue. The threads simulate processing by sleeping for a short period. One thread takes 500 milliseconds to process an item and the other takes a full second. The output thread reads the output queue and writes to the console.
private void DoStuff()
{
var inputQueue = new BlockingCollection<int>();
var outputQueue = new BlockingCollection<int>();
var processQueue = new Action<int>((delay) =>
{
foreach (var i in inputQueue.GetConsumingEnumerable())
{
Thread.Sleep(delay);
outputQueue.Add(i);
}
});
var outputData = new Action(() =>
{
foreach (var i in outputQueue.GetConsumingEnumerable())
{
Console.Write(i + ", ");
}
Console.WriteLine();
});
// start two processing threads
var t1 = Task.Factory.StartNew(() => processQueue(500));
var t2 = Task.Factory.StartNew(() => processQueue(1000));
// and one output thread
var t3 = Task.Factory.StartNew(() => outputData());
// fill the input queue
var nums = new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
foreach (var i in nums)
{
inputQueue.Add(i);
}
inputQueue.CompleteAdding();
// wait for all processing to be complete
Task.WaitAll(t1, t2);
outputQueue.CompleteAdding();
// and wait for output thread to finish
t3.Wait();
}
The program’s output is:
0, 1, 2, 4, 3, 5, 7, 6, 8, 9,
Items 3 and 6 are out of order!
The way to solve the problem is for the output method to keep track of which item it’s expecting, and not write until that item comes in. To do that, the output method has to save items that come in out of order. That’s where a priority queue comes in. Rather than pulling items from the output queue and outputting them directly, the program puts it on a heap and then outputs items as long as the lowest item on the heap is the expected item. Like this:
var outputData = new Action(() =>
{
var expectedItem = 0;
var buffer = new BinaryHeapInt();
foreach (var i in outputQueue.GetConsumingEnumerable())
{
buffer.Insert(i);
// Output any items in the buffer that are in order.
while (buffer.Count > 0 && buffer.Peek() == expectedItem)
{
Console.Write(expectedItem + ", ");
buffer.Remove();
++expectedItem;
}
}
// empty the buffer of all remaining items.
while (buffer.Count > 0)
{
Console.Write(buffer.Remove() + ", ");
}
Console.WriteLine();
});
I realize that the code isn’t optimum. In particular, it’s silly to always add the item to the heap, since in most cases the heap will be empty and the item removed from the queue is probably the expected item. Doing it this way illustrates the point more clearly, and the cost of a heap insertion and removal is so small in comparison to whatever output the code will be doing, that the small inefficiency just wouldn’t matter.
That solution works great as long as items don’t get too far out of order. If most items are processed in a few milliseconds but some items can take several minutes, the heap (buffer
in the code above) will have to hold thousands of items. That could be a problem if records are large.
This method also will fail if one of the processing threads drops the ball. The output thread will keep adding items to the buffer, waiting for an item that isn’t coming. But I suppose you have bigger problems than an overflowing output queue if one of your processing threads crashes or goes into an infinite loop on a particular item.
Another solution to the problem would be to make the outputQueue
itself a priority queue. That’s possible with BlockingCollection
, but it assumes you have a concurrent priority queue class that implements the IProducerConsumerCollection interface. We don’t have such a thing. Yet?
Selection
Another use for heaps is selection: picking the best (defined as those with the highest value, however you compute that) items from a large list of candidates. In one memorable case, I had to select the most relevant 200 items from a list of over two million. We computed a relevancy score for each of the items, and then selected those that had the highest score. The naive way to do this is to sort the items by relevancy and then take the last 200 from the resulting array. That’s effective, but expensive. Comparison sorting is an O(n log n)
operation. Sorting two million items takes a relatively long time.
With a heap, you can perform the selection in time proportional to n log k
, where k
is the number of items you want to select. The idea is that you build a heap of the first k
items. Then, for each subsequent item in the list, you compare it against the smallest item on the heap (Peek()
). If the new item is larger than that smallest item, remove the smallest item and add the new item. It looks like this:
private List<int> SelectTopK(List<int> items, int k)
{
// build a heap from the first 200 items
var heap = new BinaryHeapInt(items.Take(k));
// for each successive item . . .
foreach (var i in items.Skip(k))
{
if (i > heap.Peek())
{
heap.Remove();
heap.Insert(i);
}
}
// build a list from the heap contents
var result = new List<int>();
while (heap.Count > 0)
{
result.Add(heap.Remove());
}
return result;
}
A side effect of using the heap in this way is that the resulting list of top k items is in ascending order.
I discussed that selection algorithm in much detail a while back in When theory meets practice.
And more selection
A variation on that selection algorithm is selecting the fewest number of items whose sum is at least N. In the above example you might want to have a list of items whose sum is 5,000 or more, but you don’t want more items than you have to take. So if one number is 5,001, then your list will contain a single number.
This, too, is solved with a heap. The idea is to keep track of the sum, and remove things from the heap if doing so won’t take the sum below the target. The code looks like this:
private List<int> SelectTopSum(List<int> items, int target)
{
var heap = new BinaryHeapInt();
int runningTotal = 0;
foreach (var i in items)
{
if (runningTotal < target)
{
// Haven't reached the target yet, so just add it
runningTotal += i;
heap.Insert(i);
}
else if (i > heap.Peek())
{
// Current item is larger than the smallest item on the heap.
// Remove the small item and add this one.
runningTotal -= heap.Remove();
runningTotal += i;
heap.Insert(i);
}
// Remove items from the heap to make runningTotal
// as small as possible, but still >= target
while ((runningTotal - heap.Peek()) >= target)
{
runningTotal -= heap.Remove();
}
}
// build a list from the heap contents
var result = new List<int>();
while (heap.Count > 0)
{
result.Add(heap.Remove());
}
return result;
}
Those are just three examples of how I’ve used heaps in production code. There are many more. I find the heap data structure so useful that I’m surprised the .NET Framework’s runtime library doesn’t include one. As you’ll see, it’s not especially difficult to take the rudimentary BinaryMinHeapInt
class and turn it into a full blown collection class that supports generics, IEnumerable<T>
, and the other things we’ve come to expect from .NET collection classes.
Before we do that, though, I want to introduce a more generalized heap that is as easy to implement as the binary heap but can be a good bit faster in execution. The binary heap works well and is reasonably fast but if I can get a good speed improvement for just a little extra work, I’ll take it.