3 Little Piggy's: Advanced #Pig Join Scenarios

One of the most common operations in any Pig job is the join. A join, much like what you like the ones you work with in SQL Server, brings together two sets of data into one. These joins can happen in multiple different ways and Pig supports most of what you would expect including inner joins as well as left, right and full outer joins. Beyond these joins however, there are three special (advanced) join scenarios that are fairly typically in a Big Data environments: replicated, skewed and merge.

Replicated

Before we start, its important to remember that Pig is an abstraction over top of MapReduce jobs that do the actual heavy lifting. So, when you typically join together two big set’s of data that can be fragmented or distributed across multiple machines, its necessary for a reducer to sort the data before it can be joined. In scenarios, where you are joining one big set of data to a small(er) set of data this can be especially inefficient, particularly if the smaller set of data will fit within memory on the data/compute nodes. Let’s look at an example using a customer transaction data set, which could potentially have billions of rows that is joined to a smaller geography data set.

 transactions = load 'customer_transactions' as (
    fname, lname, city, state, country, amount, tax);
geography = load 'geo_data' as (state, country, district, manager);

sales = join transactions by (state, country), 
    geography by (state, country);

As the example is written, this job will requires both a Map & Reduce job to successfully make the join work which leads to larger and larger inefficiency as the customer data set grows in size. This is the exact scenario that is optimized by using a Replicated join.

The replicated join, tells Pig to distribute the geography set to each node, where it can be join directly in the Map job and eliminates the need for the Reduce job altogether. To use this feature, we simply change the syntax as follows:

 transactions = load 'customer_transactions' as (
    fname, lname, city, state, country, amount, tax);
geography = load 'geo_data' as (state, country, district, manager);

sales = join transactions by (state, country), 
    geography by (state, country)
    using 'replicated';

Note that Pig considers the second join, the smaller data set and does not do any checking on your behalf.

Skewed

A second scenario that happens frequently in these joins results from that is data unbalance or skewed around the join keys. To better illustrate this concept, we can revisit the example above. To make this example work, we will assume that our geography set no longer fits within memory and that the bulk of our customer transactions happen in the state of Florida (United States). Using the same initial example from above, would result in a majority of the transactions data being sent to a single reducer, while other reducers handling other states/regions have little (by comparison) work to do. These other reducers would finish quickly, with the bottleneck being the single reducer handling the state of Florida transactions. This is called skew.

The skewed join in Pig handles this scenario, by pre-sampling the data using a preliminary MapReduce. During the pre-sampling, Pig uses its algorithms to identifies keys which have data that is skewed. The skewed data will be automatically split the set across multiple reducers. A second MapReduce job is then run to perform the actual join.

 transactions = load 'customer_transactions' as (
    fname, lname, city, state, country, amount, tax);
geography = load 'geo_data' as (state, country, district, manager);

sales = join transactions by (state, country), 
    geography by (state, country)
    using 'skewed';

Merge

Occasionally, the data sets you are interested in joining are pre-sorted. Since the sort operation in the underlying MapReduce jobs is an expensive process it can be avoid and the two sets can be joined in a efficient and economical method. Behind the scenes, much like a Skewed join, a prep MapReduce job is run to create an index of each join key at the beginning of each split block. During the join, records are either matched or discarded. No safety check is done to ensure the data is indeed sorted. The syntax for this join operation is listed below:

 transactions = load 'customer_transactions' as (
    fname, lname, city, state, country, amount, tax);
geography = load 'geo_data' as (state, country, district, manager);

sales = join transactions by (state, country), 
    geography by (state, country)
    using 'merge';

Wrap-Up

Joins are one of the most common operations in relational database world. They are also commonly found in the world of big data. This post discussed some of the more advanced scenarios that occur when joining big distributed data sets together. The replicated join is used when one data set is large and the other is small enough to fit entirely in memory, thereby avoiding an unnecessary reduce operation. The skewed join efficiently handles data sets where are not balanced across the join keys and finally the merge join efficiently handles pre-sorted data.

Till next time!

Chris