Running Apache Pig (Pig Latin) at Apache Hadoop on Windows Azure

Microsoft Distribution of Apache Hadoop comes with Pig Support along with an Interactive JavaScript shell where users can run their Pig queries immediately without adding specific configuration. The Apache distribution running on Windows Azure has built in support to Apache Pig.

Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.

At the present time, Pig's infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, for which large-scale parallel implementations already exist (e.g., the Hadoop subproject). Pig's language layer currently consists of a textual language called Pig Latin, which has the following key properties:

  • Ease of programming: It is trivial to achieve parallel execution of simple, "embarrassingly parallel" data analysis tasks. Complex tasks comprised of multiple interrelated data transformations are explicitly encoded as data flow sequences, making them easy to write, understand, and maintain.
  • Optimization opportunities: The way in which tasks are encoded permits the system to optimize their execution automatically, allowing the user to focus on semantics rather than efficiency.
  • Extensibility: Users can create their own functions to do special-purpose processing.

Apache Pig has two execution modes or exectypes:

  • Local Mode: - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local).

Example:

   $ pig -x local

  $ pig

  • Mapreduce Mode: - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, but don't need to, specify it using the -x flag (pig OR pig -x mapreduce).

Example:

   $ pig -x mapreduce

You can run Pig in either mode using the "pig" command (the bin/pig Perl script) or the "java" command (java -cp pig.jar ...). To learn more about Apache Pig please click here.

After you have configured your Hadoop Cluster on Windows Azure, you can remote login to your Hadoop Cluster. To run Pig scripts you can copy sample pig files at the C:\Apps\dist\pig folder from the link here:

Now, you can launch the Hadoop Command Line Shortcut and run the command as below:

cd c:\apps\dist\examples\pig

   hadoop fs -copyFromLocal excite.log.bz2 excite.log.bz2

C:\Apps\dist\pig>pig

    grunt> run script1-hadoop.pig

Once the Job has started you can see the job details at Job Tracker (https://localhost:50030/jobtracker.jsp)

script1-hadoop.pig:

 

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements.  See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership.  The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License.  You may obtain a copy of the License at

*

*     https://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

 

-- Query Phrase Popularity (Hadoop cluster)

 

-- This script processes a search query log file from the Excite search engine and finds search phrases that occur with particular high frequency during certain times of the day.

 

 

-- Register the tutorial JAR file so that the included UDFs can be called in the script.

REGISTER ./tutorial.jar;

 

-- Use the  PigStorage function to load the excite log file into the “raw” bag as an array of records.

-- Input: (user,time,query)

raw = LOAD 'excite.log.bz2' USING PigStorage('\t') AS (user, time, query);

 

 

-- Call the NonURLDetector UDF to remove records if the query field is empty or a URL.

clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);

 

-- Call the ToLower UDF to change the query field to lowercase.

clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;

 

-- Because the log file only contains queries for a single day, we are only interested in the hour.

-- The excite query log timestamp format is YYMMDDHHMMSS.

-- Call the ExtractHour UDF to extract the hour (HH) from the time field.

houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;

 

-- Call the NGramGenerator UDF to compose the n-grams of the query.

ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;

 

-- Use the  DISTINCT command to get the unique n-grams for all records.

ngramed2 = DISTINCT ngramed1;

 

-- Use the  GROUP command to group records by n-gram and hour.

hour_frequency1 = GROUP ngramed2 BY (ngram, hour);

 

-- Use the  COUNT function to get the count (occurrences) of each n-gram.

hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;

 

-- Use the  GROUP command to group records by n-gram only.

-- Each group now corresponds to a distinct n-gram and has the count for each hour.

uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;

 

-- For each group, identify the hour in which this n-gram is used with a particularly high frequency.

-- Call the ScoreGenerator UDF to calculate a "popularity" score for the n-gram.

uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));

 

-- Use the  FOREACH-GENERATE command to assign names to the fields.

uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;

 

-- Use the  FILTER command to move all records with a score less than or equal to 2.0.

filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;

 

-- Use the  ORDER command to sort the remaining records by hour and score.

ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score;

 

-- Use the  PigStorage function to store the results.

-- Output: (hour, n-gram, score, count, average_counts_among_all_hours)

STORE ordered_uniq_frequency INTO 'script1-hadoop-results' USING PigStorage();

 

 

The full output of the Job as below:

 

c:\Apps\dist\pig>pig

2012-01-10 07:22:23,273 [main] INFO  org.apache.pig.Main - Logging error messages to: c:\Apps\dist\pig\pig_1326180143273.log

2012-01-10 07:22:23,695 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://10.2

8.202.165:9000

2012-01-10 07:22:24,070 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: 10.28.2

02.165:9010

grunt> run script1-hadoop.pig

grunt> /*

grunt>  * Licensed to the Apache Software Foundation (ASF) under one

grunt>  * or more contributor license agreements.  See the NOTICE file

grunt>  * distributed with this work for additional information

grunt>  * regarding copyright ownership.  The ASF licenses this file

grunt>  * to you under the Apache License, Version 2.0 (the

grunt>  * "License"); you may not use this file except in compliance

grunt>  * with the License.  You may obtain a copy of the License at

grunt>  *

grunt>  *     https://www.apache.org/licenses/LICENSE-2.0

grunt>  *

grunt>  * Unless required by applicable law or agreed to in writing, software

grunt>  * distributed under the License is distributed on an "AS IS" BASIS,

grunt>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

grunt>  * See the License for the specific language governing permissions and

grunt>  * limitations under the License.

grunt>  */

grunt>

grunt> -- Query Phrase Popularity (Hadoop cluster)

grunt>

grunt> -- This script processes a search query log file from the Excite search engine and finds search phrases that occur with particular high frequen

cy during certain times of the day.

grunt>

grunt>

grunt> -- Register the tutorial JAR file so that the included UDFs can be called in the script.

grunt> REGISTER ./tutorial.jar;

grunt>

grunt> -- Use the  PigStorage function to load the excite log file into the ôrawö bag as an array of records.

grunt> -- Input: (user,time,query)

grunt> raw = LOAD 'excite.log.bz2' USING PigStorage('\t') AS (user, time, query);

grunt>

grunt>

grunt> -- Call the NonURLDetector UDF to remove records if the query field is empty or a URL.

grunt> clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);

grunt>

grunt> -- Call the ToLower UDF to change the query field to lowercase.

grunt> clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;

grunt>

grunt> -- Because the log file only contains queries for a single day, we are only interested in the hour.

grunt> -- The excite query log timestamp format is YYMMDDHHMMSS.

grunt> -- Call the ExtractHour UDF to extract the hour (HH) from the time field.

grunt> houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;

grunt>

grunt> -- Call the NGramGenerator UDF to compose the n-grams of the query.

grunt> ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;

grunt>

grunt> -- Use the  DISTINCT command to get the unique n-grams for all records.

grunt> ngramed2 = DISTINCT ngramed1;

grunt>

grunt> -- Use the  GROUP command to group records by n-gram and hour.

grunt> hour_frequency1 = GROUP ngramed2 BY (ngram, hour);

grunt>

grunt> -- Use the  COUNT function to get the count (occurrences) of each n-gram.

grunt> hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;

grunt>

grunt> -- Use the  GROUP command to group records by n-gram only.

grunt> -- Each group now corresponds to a distinct n-gram and has the count for each hour.

grunt> uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;

grunt>

grunt> -- For each group, identify the hour in which this n-gram is used with a particularly high frequency.

grunt> -- Call the ScoreGenerator UDF to calculate a "popularity" score for the n-gram.

grunt> uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));

grunt>

grunt> -- Use the  FOREACH-GENERATE command to assign names to the fields.

grunt> uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;

grunt>

grunt> -- Use the  FILTER command to move all records with a score less than or equal to 2.0.

grunt> filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;

grunt>

grunt> -- Use the  ORDER command to sort the remaining records by hour and score.

grunt> ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score;

grunt>

grunt> -- Use the  PigStorage function to store the results.

grunt> -- Output: (hour, n-gram, score, count, average_counts_among_all_hours)

grunt> STORE ordered_uniq_frequency INTO 'script1-hadoop-results' USING PigStorage();

2012-01-10 07:22:48,614 [main] WARN  org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 3 time(s).

2012-01-10 07:22:48,614 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY,ORDER_BY,DISTINCT,FILTER

2012-01-10 07:22:48,614 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logica

l plan will be used.

2012-01-10 07:22:48,958 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: ordered_uniq_frequency: Store(hdfs://10.2

8.202.165:9000/user/avkash/script1-hadoop-results:PigStorage) - scope-71 Operator Key: scope-71)

2012-01-10 07:22:48,989 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optim

istic? false

2012-01-10 07:22:49,083 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.CombinerOptimizer - Choosing to move algebraic forea

ch to combiner

2012-01-10 07:22:49,192 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimizati

on: 5

2012-01-10 07:22:49,192 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimizatio

n: 5

2012-01-10 07:22:49,349 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

2012-01-10 07:22:49,364 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buf

fer.percent is not set, set to default 0.3

2012-01-10 07:22:50,536 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job

2012-01-10 07:22:50,536 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting identity combiner class

.

2012-01-10 07:22:50,552 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxR

educers=999 totalInputFileSize=10408717

2012-01-10 07:22:50,552 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default pa

rallelism is set for this job. Setting number of reducers to 1

2012-01-10 07:22:50,646 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for

submission.

2012-01-10 07:22:51,145 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete

2012-01-10 07:22:51,349 [Thread-6] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2012-01-10 07:22:51,364 [Thread-6] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

2012-01-10 07:22:51,380 [Thread-6] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1

2012-01-10 07:22:52,661 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_201201092258_00

01

2012-01-10 07:22:52,661 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: https://10.2

8.202.165:50030/jobdetails.jsp?jobid=job_201201092258_0001

2012-01-10 07:23:59,655 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:02,655 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:07,655 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:12,654 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:17,654 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:22,653 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 10% complete

2012-01-10 07:24:23,653 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 13% complete

2012-01-10 07:24:26,653 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 16% complete

2012-01-10 07:24:27,653 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 16% complete

2012-01-10 07:24:42,652 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 19% complete

2012-01-10 07:24:57,229 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

2012-01-10 07:24:57,229 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buf

fer.percent is not set, set to default 0.3

…..

…..

…..

05

2012-01-10 07:29:07,411 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - More information at: https://10.2

8.202.165:50030/jobdetails.jsp?jobid=job_201201092258_0005

2012-01-10 07:29:36,487 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 90% complete

2012-01-10 07:29:51,501 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 93% complete

2012-01-10 07:30:12,171 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete

2012-01-10 07:30:12,187 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics:

 

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features

0.20.203.1-SNAPSHOT     0.8.1-SNAPSHOT  avkash  2012-01-10 07:22:49     2012-01-10 07:30:12     GROUP_BY,ORDER_BY,DISTINCT,FILTER

 

Success!

 

Job Stats (time in seconds):

JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs

job_201201092258_0001   1       1       54      54      54      39      39      39      clean1,clean2,houred,ngramed1,raw       DISTINCT

job_201201092258_0002   1       1       39      39      39      30      30      30      hour_frequency1,hour_frequency2 GROUP_BY,COMBINER

job_201201092258_0003   1       1       18      18      18      24      24      24      filtered_uniq_frequency,uniq_frequency1,uniq_frequency2,uniq_f

requency3       GROUP_BY

job_201201092258_0004   1       1       12      12      12      21      21      21      ordered_uniq_frequency  SAMPLER

job_201201092258_0005   1       1       12      12      12      21      21      21      ordered_uniq_frequency  ORDER_BY        hdfs://10.28.202.165:9

000/user/avkash/script1-hadoop-results,

 

Input(s):

Successfully read 944954 records (10409087 bytes) from: "hdfs://10.28.202.165:9000/user/avkash/excite.log.bz2"

 

Output(s):

Successfully stored 13528 records (659755 bytes) in: "hdfs://10.28.202.165:9000/user/avkash/script1-hadoop-results"

 

Counters:

Total records written : 13528

Total bytes written : 659755

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

 

Job DAG:

job_201201092258_0001   ->      job_201201092258_0002,

job_201201092258_0002   ->      job_201201092258_0003,

job_201201092258_0003   ->      job_201201092258_0004,

job_201201092258_0004   ->      job_201201092258_0005,

job_201201092258_0005

 

 

2012-01-10 07:30:12,296 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NO

N_EXISTENT_FIELD 14 time(s).

2012-01-10 07:30:12,296 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

grunt>

 

Resources: