Throttling Asynchronous Tasks

I was recently tasked with load testing CouchDB. We have been using the document store very heavily for several new projects and we really needed to understand what it was capable of. More importantly, we needed to try and identify where it starts to fall apart. Knowing the performance characteristics of the software you are using is vital when making decisions about scaling.

The first step was to identify a large data set. I started to go grab the stack overflow data when a friend of mine pointed me to wikipedia. Now this is a set of data to toy with. It's an xml file which contains the more than 9.5 million pages which make up wikipedia. Let that soak in for a minute. Unzipped it weighs in at 25.4GB and I'm sure that it has grown since I downloaded last.

Time to play!
The general structure of the data looks like this:

    public class Page
        public string Id { get; set; }
        public string Title { get; set; }
        public string Redirect { get; set; }
        public Revision Revision {get;set;}

    public class Revision {
        public string Id { get; set; }
        public DateTime Timestamp { get; set; }
        public Contributor Contributor {get;set;}
        public string Minor {get;set;}
        public string Comment { get; set; }
        public string Text {get;set;}

    public class Contributor {
        public string Username { get; set; }
        public string Id { get; set; }
        public string Ip { get; set; }

After identifying the structure, my next step was to actually parse the xml file. After a few minutes of banging my head against the wall, I ended up at this. I use an XmlReader to scan forward through the file and use the LINQ to XML bits to break off the page elements.

    public class WikiFileParser {
        string fileName;

        public WikiFileParser(string fileName){
            this.fileName = fileName;

        public IEnumerable<XElement> GetPages() {
            var file = new StreamReader(fileName);
            var reader = XmlReader.Create(file);

            while (reader.Read()) {
                if (reader.NodeType == XmlNodeType.Element && reader.Name == "page"){
                    XElement x = XNode.ReadFrom(reader) as XElement;
                    if (x != null)
                        yield return x;

The reason I chose this method is that I needed to keep a minimal amount of data in memory and I wanted to use the convenience of the LINQ to XML api to populate my objects.

Before I get to the meat, I need to share a few extension methods:

    public static XElement Named(this XElement elm, string name){
        var newElm = elm.Element("{}" + name);
        if (newElm == null)
            newElm = new XElement("dummy");
        return newElm;

This one is to get around some weird namespace issues that I encountered. If someone knows how I can avoid this, please let me know!

    public static IEnumerable<IEnumerable<T>> Chunk<T>(this IEnumerable<T> pages, int count){
        List<T> chunk = new List<T>();
        foreach (var page in pages){

            if (chunk.Count == count){
                yield return chunk.ToList();
        yield return chunk.ToList();

This one is kind of interesting. I'm using this to batch up single entities into clumps so that I can perform bulk operations.

Finally, on to the meat!
I'll start by posting the code and the I'll explain each method in detail.

    public class BulkLoader{
        string uri;
        public BulkLoader(string uri) { this.uri = uri; }

        public void Load(string filename){
            Action<IEnumerable<XElement>> saveAction = Save;
            var file = new WikiFileParser(filename);

            var workers = file
                .Select(x => saveAction.BeginInvoke(x, null, null))
                .Aggregate(new Queue(),
                           (queue, item) =>{
                               if (queue.Count > 5)
                               return queue;

            //Wait for the last bit to finish
            workers.All(x => x.AsyncWaitHandle.WaitOne());

        void Save(IEnumerable<XElement> elms){
            var json = new { docs = elms.Select(x => MakePage(x)) }.ToJson(false);
            var request = WebRequest.Create(uri);
            request.Method = "POST";
            request.Timeout = 90000;

            var bytes = UTF8Encoding.UTF8.GetBytes(json);
            request.ContentType = "application/json; charset=utf-8";
            request.ContentLength = bytes.Length;

            using (var writer = request.GetRequestStream()){
                writer.Write(bytes, 0, bytes.Length);

        Page MakePage(XElement x){
            var rev = x.Named("revision");
            var who = rev.Named("contributor");
            return new Page(){
                Title = x.Named("title").Value,
                Redirect = x.Named("redirect").Value,
                Id = x.Named("title").Value,
                Revision = new Revision(){
                    Id = rev.Named("id").Value,
                    Timestamp = Convert.ToDateTime(rev.Named("timestamp").Value),
                    Contributor = new Contributor(){
                        Id = who.Named("id").Value,
                        Username = who.Named("username").Value,
                        Ip = who.Named("ip").Value
                    Minor = rev.Named("minor").Value,
                    Comment = rev.Named("comment").Value,
                    Text = rev.Named("text").Value,

MakePage is responsible for turning the xml into a plain object. Here is where I needed that funky extension method to keep from typing the namespace over and over. There's not much to see here.

Save is responsible for persisting the object to CouchDB. Alex Robson will roll his eyes when he sees that part. He has written a tremendously awesome .NET CouchDB API called Relax which you can find on github. The reason I decided not to use his api is because I was trying to eek out every bit of speed increase that I could find.

Load is where the magic happens. This is the part that tripped me up for quite a while. At first I was using Parallel.ForEach and I kept running into out of memory exceptions. It wasn't until Alex put up this blog post that I saw where I went wrong. I used a similar approach, but with plain old IEnumerable instead of IObservable / Reactive Extensions.

Then, Alex decided to one up me the other night before I even finished writing this post. You see, the problem with the original solution is that it would collect 5 asynchronous tasks and then wait for them all to finish before starting 5 more. My implementation suffered the same problem.

Sing with me; "Anything Rx can do, IEnumerable can do with more coooooode." I'm sorry that I just put you through that; my musical skills are definitely lacking. I really should get back to explaining code now. So, you see I start by looking at the stream of pages. I collect 1000 and then I fire off an asynchronous invoke of the batch save I describe above. The result of that is a handle to the asynchronous task which I can then use to track the status of it. I then aggregate that into a queue where will collect up to 5 tasks and then start dequeuing the oldest task and then waiting until it's finished.

The result? I can keep 5 tasks running constantly always pumping data. Since the data set is rather large and I'm using fairly good sized batches, this produces quite a bit of memory pressure. If you want to try this at home with not so much memory, then back down the batch size.

This was a fun task to do. Since I've already lost most readers somewhere about 15 paragraphs back, I'll save the performance for another post.