Streaming #Pig

As a C# developer there are a number of opportunities available for writing code that is either used by or interacts with a Hadoop/HDInsight cluster. A number of these have been well publicized and documented. In fact there is an entire .Net SDK for Hadoop (HERE) that will allow you to easily write streaming MapReduce jobs, manage your cluster and even write LINQ-based queries against Hive (among other things).

One option that you may not have been aware of is the streaming feature built into Pig. Pig Streaming allows you to write either a C# or VB.Net executable to easily handle complex processing that would be typically be handled by something like custom Pig User-Define Function written in Java.

How it works

The streaming interface within Pig should not be confused with what you may or may not know about MapReduce streaming. It stands entirely on its on and allows for non-Java UDFs and other user-define binaries or scripts to handle data within a Pig job by passing in relations as input and receiving relations as output. Binaries or scripts can be written in any language including Python, Perl, Java and of course C#/VB.Net (the focus of this blog).

As you consider this functionality there are several design points/considerations to be aware of:

  • Streaming jobs function act a lot like an asynchronous SSIS transform in that there is no temporal nor casual links between the data input and the output received back. For example, it's perfectly permissible to do task such as a pivot or unpivot within a streaming executable.
  • When working in a cluster, be aware that underneath the pretty Pig covers there is one or more MapReduce jobs running. These jobs can/are run in parallel and could affect how data is streamed to your assembly. There are three specific data guarantees provided by the Pig team:
    1. By default data is delivered unordered
    2. If data is grouped (using the GROUP operator) in your Pig Latin script, data will be delivered contiguously to a single node.
    3. If data is grouped and ordered in your Pig Latin script, data will be delivered contiguously to a single node sorted by the user-defined key.
  • When working with C# or VB.Net the entire relation is passed as input and must be parsed out, no data schema metadata is included.

Writing a Streaming C# Assembly

There is no real trick to writing a streaming assembly using the .Net framework. Starting with a Windows Console project within Visual Studio, you read from the standard input stream, do whatever work is required and write to the standard output stream. Along the way, the input must be parsed and the output must be formatted using the appropriate Pig field delimiter (tab by default). The example below shows the basic code required to perform these functions.

 using System;
using System.IO;

namespace PigStreamer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Hook to the standard output
            using (StreamWriter output = new StreamWriter(Console.OpenStandardOutput()))
            {
                //Hook to the standard input
                using (StreamReader input = new StreamReader(Console.OpenStandardInput()))
                {
                    //Read the first line
                    string line = input.ReadLine();

                    //Now Iterate.....
                    while (line != null)
                    {
                        //Split the line into columns (tab is the default delimited)
                        string[] columns = line.Split(new char[] { '\t' });

                        //Write the output delimited and formatted as a single line
                        output.WriteLine(string.Format("{0}\t{1}", columns[0], columns[1]));

                        line = input.ReadLine();
                    }
                }

            }
        }
    }
}

Streaming through Pig

After you written, compiled and tested your executable it is ready to be referenced in your Pig script. The most basic syntax is seen below and includes the external assembly as well as the defined output schema:

 Y = stream X through 'c:\temp\PigStreamApp.exe' as (col1, col2, col3);
dump Y;

This example can be expanded by moving the reference into a named definition using the DEFINE statement. Optionally, the SHIP command can be used to push the executable from the client to your computational nodes.

 DEFINE streamer 'c:\temp\PigStreamApp.exe' SHIP('c:\temp\StreamApp.exe');
Y = stream X through streamer as (col1, col2, col3);
dump Y;

If your assembly has runtime parameters or arguments, they can be passed in as follows and will be available in the args parameter: passed to your executable's main method.

 DEFINE streamer 'c:\temp\PigStreamApp.exe -myparameter=5';

Wrap-Up

This post demonstrated just one of the many options that are available to .Net developers for handling complex data processing scenarios by using the streaming functionality built into Pig.  By leveraging this capability, you can easily write either C# or VB.net code to handle data processing directly from a Pig job without requiring you to get involved with Java. This hopefully lowers the bar and helps you get a jump start on taking advantage of Pig to handle your data processing needs.

Till next time!

Chris