This is the second in a short series about using multithreading to speed traditional input-process-output (IPO) processes. Part 1 described the problem and showed solutions in pseudo code. This part presents C# source code that compares the approaches.
A C# program that reads a text file, processes each line, and outputs the result is a pretty simple thing to write. The only “heavy lifting,” if any, is in the processing part. The input and output are standard. I write a lot of simple utility programs that process text files, and I’ve developed a skeleton program that takes care of all the boilerplate for me. The heart of that template is a method that looks a lot like this:
void ProcessFile_SingleThread(string inputFilename, string outputFilename,
Func<string, string> processLine)
{
using (StreamWriter outputFile = new StreamWriter(outputFilename))
{
foreach (string line in File.ReadLines(inputFilename))
{
string rslt = processLine(line);
outputFile.WriteLine(rslt);
}
}
}
Or, if you really want to be concise:
File.WriteAllLines(outputFilename,
File.ReadAllLines(inputFilename).Select(processLine));
Given that, I just have to write a method that takes a string
parameter, processes it, and returns a string
result. For example, if I just want to reverse the string, I’d write my ReverseLine
method:
private string ReverseLine(string s)
{
return new string(s.Reverse().ToArray());
}
And then call it like this:
ProcessFile_SingleThread("foo.txt", "oof.txt", ReverseLine);
If it really is something as simple as reversing a line, I’d probably use a Lambda:
ProcessFile_SingleThread("foo.txt", "oof.txt",
(s) => new string(s.Reverse().ToArray()));
However it’s called, that method opens the output file, reads the input file line-by-line, processes the line, and writes it to the output. That’s really all there is to it.
The method that uses asynchronous reads and writes isn’t much more complicated:
void ProcessFile_UsingAsync(string inputFilename, string outputFilename,
Func<string, string> processLine)
{
using (var outputFile = new StreamWriter(outputFilename))
{
Task writeTask = null;
using (var inputFile = new StreamReader(inputFilename))
{
string line;
// start reading the first line
var readTask = inputFile.ReadLineAsync();
// querying readTask.Result blocks until the task is complete
while ((line = readTask.Result) != null)
{
// start next read
readTask = inputFile.ReadLineAsync();
// while that's happening, process the line
var rslt = processLine(line);
// wait for previous write (if any) to complete
if (writeTask != null)
{
writeTask.Wait();
}
// start writing the line
writeTask = outputFile.WriteLineAsync(rslt);
}
}
// Wait for final write to complete
if (writeTask != null)
{
writeTask.Wait();
}
}
}
I know it looks more involved, but a lot of those lines are just white space. It does take a little more work to start the asynchronous reads and writes, and to wait for them to complete when necessary, but it’s really not that complex.
Unfortunately, that asynchronous version runs slower than the simple single-threaded version. The problem is that asynchronous transactions take some non-zero time to set up, apparently more time than it takes to read/write a single line from/to a text file. If disks weren’t so fast or my processing were much slower, the asynchronous version would be faster because the sub-tasks would overlap.
It turns out that I don’t very often use ReadLineAsync
and WriteLineAsync
. I do, however, use the asynchronous methods for reading from a FileStream
. The older BeginRead
/ EndRead
and corresponding Write
asynchronous calls save a lot of time when reading large blocks of binary data. They’re a bit difficult to use at first, but they’re very effective. .NET 4.5 introduced ReadAsync
and WriteAsync
, which are dead simple to use and perform very well. Converting the method above to process binary data using those methods is a straightforward job.
The last technique I discussed in Part 1 was having three independent tasks: a reader, a processor, and a writer, all of which communicate through queues. The idea is to fire up the reader and have it start putting lines into a queue. The processing thread reads a line from the queue, processes it, and writes the result to a second queue: the output queue. The writer task reads the output queue and writes data to disk. As you can see here, the .NET BlockingCollection class (introduced in .NET 4.0) makes this fairly easy to do.
const int QueueSize = 10000;
private BlockingCollection<string> _inputQueue = new BlockingCollection<string>(QueueSize);
private BlockingCollection<string> _outputQueue = new BlockingCollection<string>(QueueSize);
private void ReadInput(string inputFilename)
{
foreach (var line in File.ReadLines(inputFilename))
{
_inputQueue.Add(line);
}
_inputQueue.CompleteAdding();
}
private void WriteOutput(string outputFilename)
{
using (var outputFile = new StreamWriter(outputFilename))
{
foreach (var line in _outputQueue.GetConsumingEnumerable())
{
outputFile.WriteLine(line);
}
}
}
void ProcessFile_Multithread(string inputFilename, string outputFilename,
Func<string, string> processLine)
{
var reader = Task.Factory.StartNew(() => ReadInput(inputFilename), TaskCreationOptions.LongRunning);
var writer = Task.Factory.StartNew(() => WriteOutput(outputFilename), TaskCreationOptions.LongRunning);
foreach (var line in _inputQueue.GetConsumingEnumerable())
{
var newline = processLine(line);
_outputQueue.Add(newline);
}
_outputQueue.CompleteAdding();
reader.Wait();
writer.Wait();
}
Again, it looks more complex than it really is, and I’ve split things up to make it more clear what’s happening. Let’s take a closer look at each part of the program.
The ReadInput
method uses the same File.ReadLines
method that the single-threaded program uses. But instead of processing the line immediately, it puts the line on the input queue. The queue is sized to hold up to 10,000 lines. If the queue fills up, then the call to Add
will block until there is space available. Once all of the lines have been read and added to the queue, the method calls CompleteAdding
to notify the queue that no more data will be added. If you subsequently call Add
on the queue, it will throw an exception.
The writer thread opens the output file and reads the output queue, writing each line as it’s received to the file. This code makes use of the GetConsumingEnumerable
method, which returns an enumerator that makes working with the queue a little nicer. Without it, the code would look something like this:
string line;
while (_inputQueue.TryTake(out line, -1))
{
processLine(line);
_outputQueue.Add(line);
}
The second parameter to TryTake
is a delay. The value -1
(in milliseconds) signifies an infinite delay. I find it much more clear to use GetConsumingEnumerable
here.
Reading that code, you might wonder how we tell the difference between a queue that’s temporarily empty, and a queue that’s empty because there’s no more data. That’s where CompleteAdding
comes in. When somebody calls CompleteAdding
on a queue, the IsAddingCompleted
property is set to True
. If that flag is not set and the queue is empty, then the assumption is that it’s only temporarily empty. But if IsAddingCompleted
is True
and the queue has no items in it, we know that it will not be getting any more data. All of the data has been consumed.
The main processing loop combines reading the input queue and writing the output queue. And when it’s done consuming the input and filling the output, it calls CompleteAdding
on the output queue. Then it waits for both tasks to complete (the reader will already be done).
By the way, you can write all of that in a single method using anonymous methods, like this:
void ProcessFile_Multithread(string inputFilename, string outputFilename,
Func<string, string> processLine)
{
const int QueueSize = 10000;
BlockingCollection<string> _inputQueue = new BlockingCollection<string>(QueueSize);
BlockingCollection<string> _outputQueue = new BlockingCollection<string>(QueueSize);
var reader = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(inputFilename))
{
_inputQueue.Add(line);
}
_inputQueue.CompleteAdding();
}, TaskCreationOptions.LongRunning);
var writer = Task.Factory.StartNew(() =>
{
using (var outputFile = new StreamWriter(outputFilename))
{
foreach (var line in _outputQueue.GetConsumingEnumerable())
{
outputFile.WriteLine();
}
}
}, TaskCreationOptions.LongRunning);
foreach (var line in _inputQueue.GetConsumingEnumerable())
{
processLine(line);
_outputQueue.Add(line);
}
_outputQueue.CompleteAdding();
reader.Wait();
writer.Wait();
}
Which you use is largely a matter of preference. I like that I can limit the scope of the queues this way, although I could have done that by passing them as parameters to the ReadInput
and WriteOutput
methods. The syntax takes some getting used to, and if the code in the tasks is at all involved the method gets to be pretty long. It’s also tempting to write code that depends too much on variables that are defined in the method body. It’s a balancing act and, as I said, largely a matter of taste. Or a matter of coding standards (that is, somebody else’s taste).
The time it takes the single threaded version to run is easy to compute: input-time + processing-time + output-time
. The tasks have to be done sequentially with no overlap, so the whole is exactly equal to the sum of the parts.
The version that does asynchronous I/O runs in time Max(input-time, processing-time, output-time)
, plus a small amount of time for the first read and the last write. But input-time
and output-time
are longer because each line read or written requires some time to issue the asynchronous request. So input-time
, which in the single threaded version is just (number-of-lines * time-to-read-one-line)
becomes (number-of-lines * (request-time + time-to-read-one-line))
. output-time
is similarly affected.
The last version also takes time Max(input-time, processing-time, output-time)
, but the difference is that input-time
is only (startup-time + (number-of-lines * time-to-read-one-line))
. The startup-time
is almost exactly the same as with the single-threaded version. Same goes for output-time
. There is a small amount of per-line overhead in adding and removing things on the queues, but they’re very quick (copy a string
reference). As a result, there is a lot more opportunity for the input and output to overlap with each other and with processing.
The single-threaded version of the program is going to be much faster than either of the other two when you’re working with small files or when the per-line processing time is very short. With larger files, the third version becomes more attractive. If the per-line processing is much longer than the per-line input or output time, then you have to choose between the last two versions of the program. The version that uses asynchronous I/O can be attractive in this case because its asynchronous requests occupy thread pool threads for a very short time. If the program is doing other asynchronous processing as well as processing the file, this frees those pool threads. In contrast, the last version keeps two pool threads occupied (but not necessarily busy) for the entire duration of the program. That means two fewer pool threads for other parts of the application.
I use the last version for most of my programs, simply because it performs almost as fast as the single-threaded version for small files, and hugely faster on larger files because the input and output largely overlap. For the smaller files, the slightly increased time just doesn’t matter. But it matters a lot when I’m processing a 30 gigabyte file.
As I think I’ve shown, it’s pretty easy these days to upgrade an old single-threaded IPO program so that it uses multiple threads to increase performance. More importantly, we’ve done it without having to delve into the mysteries of parallel array processing or mucking about with synchronization primitives. There’s nary a lock, semaphore, event, mutex, or any such in the code. Just some simple Tasks
, and a concurrent collection. Simple multithreading is easy these days, and by mastering a few simple techniques you can safely increase your program’s throughput.
In most of the programs I’ve done this with, I’ve reduced the time from (input-time + process-time + output-time)
to, essentially, output-time
, which is often very close to a 50% performance increase. The only times that’s failed is when the per-line (or per-record) processing time is huge, making the program execute in process-time
. Still, these techniques essentially eliminate input and output time in those cases. When it takes 10 minutes to read (and 15 minutes to write) a 30 gigabyte file, that’s nothing to sneeze at. But if the processing takes several hours, saving the reading and writing time doesn’t seem like a big win.
It seems like we should be able to do something to reduce that processing time. That’s the subject of the next part.