Merging small files on HDInsight

The situation

The following is a drammatically enhanced story inspired by true events

It's weird how we can see our signal in the daytime. I guess that's why Microsoft chose the Seattle area: all these clouds provide a great surface to project on. But here it was: the ominously cute elephant on the faint blue window. A customer needed our help on HDInsight.

I rushed through the corridors swatting those pesky bats out of the way to reach my terminal and enter cyberspace. There I was met and briefed by my TFS virtual assistant: the customer had a data pipeline that involved many devices uploading millions of small (~5 KB) XML files to Azure Blob storage, and then wanted to analyze this data using HDInsight.

The clues

I immediately realized the problem here: Hadoop doesn't like many small files. It much prefers big juicy files that it can ravenously churn through. And it's understandable: if you're going to churn through TB's of data, the last thing you want is to go back to your storage layer (be it HDFS, Azure Storage, or whatever) every 5 KB to ask for another 5 KB. You'd much rather each of your mapper minions ask for bigger ~100's of MB's chunks at a time that it can go through. So having these millions/billions of puny 5 KB files is a very bad scenario for responsive analytics using Hadoop.

Of course, our advance troops of excellent customer engagement and customer support people had also realized this and resorted to the oldest trick of data analysis: pre-processing. No problem they said: we'll just launch a massive job to consolidate these files first thing, then analyze our data at ease after that. It's going to be slow and not pretty, but we can always send this job to those thousands of tireless machines flying up there in the cloud before we go to bed and wake up to our nicely processed big files to analyze. So they wrote this awesomely simple Pig script to do this job:

 set pig.splitCombination true;
set pig.maxCombinedSplitSize 268435456
files = LOAD 'wasb://customercontainer@customeraccount.blob.core.windows.net/directoryWithData' AS (line:chararray);
STORE files into 'wasb://processedcontainer@customeraccount.blob.core.windows.net/processed' using PigStorage('\n');

Unfortunately that's when they hit the snag: the job crashed and burned before it even properly launched! It woud work OK if launched on say 100K files, but if launched against 400K files the job would churn for hours and usually run out of memory before it even launched any mappers. That's when they raised the signal: "what's going on here?" they very reasonably asked.

The deep problem

It was another classic dilemma of distributed processing: how much planning do you do upfront before launching a massively parallel job to make sure all these workers are well utilized? If the executive at the top spends too many weeks strategizing, the rest of the org is underutilized and nothing gets done. But if she just jumps into action with little thinking, all these people will flounder about madly trying to get things done in a massive chaos that yields frustration instead of that elusive insight. This was a case of executive over-strategizing.

Pig uses a technique popularized in Hadoop by the CombineFileInputFormat class when given many files to analyze (see PIG-1518 for Pig's implementation of this technique). It was optimized for hundreds/thousands of files in a DFS setting where data locality is highly valued, so the way it works is that it gets all the blocks for all the files that will be processed, tries to group them together into combined splits whose data lives together on the same machine (hopefully to be picked up by a mapper on that machine) and whose total size doesn't exceed maxCombinedSplitSize (256 MB in our case). This is awesome for its intended purposes, horrible for us: we don't care about data locality - under the covers in WASB we actually lie to Hadoop and tell it that all the data blocks for WASB are on "localhost" since there's no easy way for us to tell it "it's in the cloud, don't bother". And even listing all ~200 M files we wanted to process takes a long time, not to mention doing all this needless complicated logic over it. Clearly we needed to bypass Pig and get our hands dirty with our own custom input format, so we can find our own balance between planning and jumping in.

The attack

So this is where I finally show you the code, dear brave reader who stayed with me so far. All the code for this case is up on my github project very creatively called SmallFilesMerge. My first approach to this problem was simple enough: I knew that the files were organized in directories at a rough granularity, so I decided that the input format would just list those directories and then assign each mapper a directory. So I wrote the CombineDirectoryInputFormat class whose getSplits() method did just that, handing out CombineDirectoryInputSplit objects whose only member is the directory name. The DirectoryFileNameRecordReader class, which reads out the records from the input split on each mapper, then listed the files in the directory and hand the file paths out to the DirectoryMergeMapper class, which finally would just read each file and dump the content as output. Phew.

So that was my first try. Took me a couple of hours to code it up, then spent some time writing some synthetic files to test with (see the DirectoryPopulatorX classes) and off to test. It was OK, but not nearly fast enough. A couple of main problems were glaring:

  1. Each directory in the real customer case contained ~500 K files, which is a bit more than a single mapper should be expected to handle.
  2. At the beginning of each map task, the simple listing of files within the directory was taking too long.

The first problem was simple enough to tackle: I just implemented a crude hash-partitioning of the files within the directory. The input format class, instead of handing a single split per directory, would hand out N splits (where N was configurable) each of which is assigned a number in the [0, N) range. The record reader would then check each file if ABS(Hash(fileName) % N) was the assigned number (I missed the ABS() part in my first try - nasty bug) and only hand it out to the mapper if that's the case.

The second one was a bit more complex. The problem was that I was going through the WASB FileSystem abstraction, and there the API used is the listStatus() method which returns an array of FileStatus objects. This is problematic in two ways:

  1. It's a batch instead of streaming API, so we have to get all the data before handing out to the mapper.
  2. In order to properly construct the FileStatus object, WASB uses a heavy-weight API to Azure Storage to list every blob with its metadata (to e.g. tell if a blob was an actual blob not an empty-directory-indicating blob). I knew that if I go to Azure Storage directly I could use the lighter-weight API to just retrieve blob names with no metadata.

So I decided to add a mode where I use the Azure Storage API directly instead of going through WASB. I trigger this mode by including the storage key to the account in the job configuration which allows me to bypass WASB - see the UseStorageApiReader class within the DirectoryFileNameRecordReader class for the gory details of implementation there.

So with these optimizations in hand (and after fixing/working around other bugs such as the awesome XML processing bug in Java that I'll skip talking about in order to keep this already way-too-verbose post in check), we unleashed 140 nodes of HDInsight power on our ~200 M files. It worked - barely. Mappers were launched quickly (kudos to our new no-nonsense jump-into-action executive), and at a good granularity to utilize the whole cluster, but each mapper could only do ~7 files per second, so the whole cluster could do 140 x 4 x 7 = ~4 K files per second, which meant it took ~ 14 hours to go through the whole thing. I prototyped some code that allows to shard across multiple storage accounts (will blog about this topic another day) but since we already had this data in a single storage account that didn't help. In the end we settled on what we had and moved on to the next phases of the data processing with the customer.

Parting thoughts

  1. Having many tiny files like that in the beginning of your data processing is evil. I've worked with other teams in Microsoft that skirted the problem altogether by changing their data ingestion service to do the consolidation as the data came in, and they had much more success with that approach. If you can do that, I'd highly encourage it.
  2. Hadoop is very extensible. If what you're trying to do at the simpler higher level layer doesn't work, chances are there is a way to do it but which would involve getting your hands dirty with coding at the lower layers.
  3. Always double-check yourself and look for anomalies when doing data analysis work. The way I caught the missing ABS() bug in my hash-partitioning code is that I noticed that my first partition (hash value 0) was always about twice as big as any other partition. I was tempted to ignore and chalk it off to the vagaries of hashing, but on a deeper look I found the error. Admittedly I should've done a better job unit-testing my work (it's amazing how even a staunch unit tester like me would abandon everything when working under the gun) but in general it's always a good idea to sanity-check your data processing work.