Sliding Window Data Partitioning on Microsoft Azure HDInsight

HCatalog is a table and storage management layer for Hadoop that enables users with different data processing tools like Pig, Mapreduce, Hive, and Oozie to read and write data. HCatalog's table abstraction presents these tools and users with a relational view of data in the cluster. HCatalog Integration was made available starting with Apache Oozie 4.0. HCatalog serves as a nice metadata integration point for Pig, MapReduce, Hive, and Oozie. You may have seen scenarios where Pig is being used for ETL and bringing in data and adding it to metastore partitions. When using Oozie for scheduling workflows, it is key to integrate with HCatalog to sense the presence of data on a partition and act on that information. Apache Oozie 4.0 introduces HCatalog Integration and HDInsight Clusters starting with Version 3.0 support that as well. If you are interested in reading more about the HCatalog Integration feature, you can find more information on the Apache documentation here

Configuration and Deployment

The process of configuring HCatalog Integration with Oozie is detailed here. But, please note that these configuration steps are already done for you on HDInsight and the feature is available out of the box when you create a cluster! Let us do a quick run-down on the configuration highlights for this feature:

  1. PartitionDependencyManagerService and HCatAccessorService are required to work with HCatalog and support Coordinators having HCatalog URIs as data dependency. Both of these are configured on oozie-site.xml for HDInsight.
  2. HCatalog polling frequency is determined by the configuration parameter oozie.service.coord.push.check.requeue.interval. This is set to 30000 by default on HDInsight, but can be configured to a value that suits your environment by modifying that value to poll at a different frequency than the default of 30 seconds. The Add-AzureHDInsightConfigValues PowerShell cmdlet can be used to specify a custom Oozie configuration when building the HDInsight cluster as described here
  3. All the necessary JARs that are needed for HCatalog integration are also configured out of the box for you with HDInsight cluster.

HCatalog URI format

HCatalog partitions can be defined as a data dependency now with a URI notation. The general notation to specify a HCatalog table partition URI is hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];…

With HDInsight, the metastore server will point to the Oozie metastore that you configured when provisioning the cluster. The port by default is 9083.

Here is a sample URL from my HDInsight cluster:

<uri-template>hcat://headnode0:9083/default/samplelog/dt=${YEAR}-${MONTH}-${DAY}${HOUR}${MINUTE}</uri-template>

This port value comes in from hive-site.xml property –

<name>hive.metastore.uris</name>
<value>thrift://headnode0:9083</value>

What is Sliding Window Partitioning?

Hive provides a nice way to organize data logically with partitions. As an example, you may want to retain only 3 months of server log data at any time. A sliding window is basically a window of a certain width, and can accommodate only a certain number of partitions at any given time. When a new partition comes into the window, the oldest partition is automatically archived out. Archival could mean either moving that data out from that partition into HAR (Hadoop archive), or into a different Hive table, or completely deleting that partition. Note that on HDInsight, if the partition is deleted, the data still exists on the BLOB storage if the table is external.

HCatalog Integration and Sliding Window

So, the key with implementing a sliding window mechanism, is to sense the arrival of the next partition and automatically archive the oldest partition that complies with the retention policy determined by a business. In our example, we need to drop the partition for the oldest month in the sliding window, when data for the next month arrives. This is a nice fit for using HCatalog integration from an Oozie coordinator app. So, you could schedule an Oozie coordinator application that runs at the frequency of once every month, and checks to see if the partition for the next month has been made at which point it deletes the oldest partition. The HCatalog call is done using the HCatalog URI format given above.

Apache Oozie Terminology

Oozie is a workflow scheduler system to manage Apache Hadoop jobs [1]. Oozie workflow jobs are Directed Acyclical Graphs (DAGs) of actions[1]. Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability [1]. With addition of HCatalog support, Coordinators also support specifying a set of HCatalog table partitions as a dataset [2]. The workflow is triggered when the HCatalog table partitions are available and the workflow actions can then read the partition data [2].

Let us take a specific example for the sliding window, and try to implement it with an Oozie coordinator application. We need an Oozie coordinator application that runs at a frequency of once every month, and for demo purposes we need to schedule the application such that it starts on the current month, and ends itself within a few minutes within the jobstart time so that we don't leave the test Oozie coordinator app running on the cluster and taking up resources. Below, I will try to define some terminology for an Oozie coordinator application –

Oozie Coordinator application – (implemented as slidingwindow/coordinator.xml in our example)

A coordinator application defines the conditions under which coordinator actions should be created (the frequency) and when the actions can be started. The coordinator application also defines a start and an end time. Normally, coordinator applications are parameterized. A Coordinator application is written in XML [3].

The screen shot below shows the example coordinator application. You can give it any name, in this example we are giving it the name "MY_APP". As you can see from the screen shot below, coordinator application is defined in the form of XML and the first line defines the name of the job, frequency of recurrence of the coordinator actions within the job, start time for the job, end time for the job, time zone and the XML name space. When a coordinator job is submitted to Oozie, the submitter may specify as many coordinator job configuration properties as required (similar to Hadoop JobConf properties). Configuration properties that are a valid Java identifier, [A-Za-z_][0-9A-Za-z_]*, are available as ${NAME} variables within the coordinator application definition. [3]

In the example below, the frequency is set to ${coord:months(1)}. ${coord:months(1)} is an expression language function that returns the number of minutes for 1 complete month. So, this indicates that the Oozie coordinator application is set to recur at a frequency of once a month. ${jobStart} indicates the start time for this job, which we will be passing on from the job properties payload from within PowerShell. ${jobEnd} indicates the time at which the job should end, and this again will be passed from the job properties payload. Job properties payload is described in greater detail further down on this blog.

Within the coordinator application, one can define the datasets, input events, and actions as seen on the code snippet above. We will look at each of these in greater detail below:

Synchronous Dataset

The screen show below shows the synchronous dataset defined for our example. Synchronous dataset instances are generated at fixed time intervals and there is a dataset instance associated with each time interval.[3] In our example, the frequency of datasets getting produced is once every month. Initial-instance defines the lower bound/baseline for the dataset. So, if there are instances for the dataset that occur prior to the initial-instance, those will be silently ignored. For the purpose of this example, we are going to set the initial-instance to be the same as the job start time. This is because, in our case, we will be looking for the presence of a future partition that is the partition corresponding to the next month to be present for us to act on it and drop the oldest partition.

Notice on line#4 from the screen shot below, how we reference the HCatalog URI for HDInsight. We are looking for a specific partition in the format dt=YYYY-MM on the table samplelog for us to proceed.

Input Events

The input events of a coordinator application specify the input conditions that are required in order to execute a coordinator action [3]. In our example, we would need the partition for the next month to be present in order for the coordinator action to execute. So, we can use the EL (Expression Language) function ${coord:current(n)} which represents the nth dataset instance for a synchronous dataset, relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. So, ${coord:current(1)} would represent the instance corresponding to the 1st instance of month from the start time. So, given the start month of May, the instance represented by ${coord:current(1)} would be the month of June.

Oozie Payload – job.properties

Oozie payload/job.properties is used to pass the configuration parameters for the Oozie coordinator application. The XML payload below defines the various parameters passed to the Oozie coordinator job, like jobStart, jobEnd, initialInstance etc.

Bringing it all together with PowerShell

Please see the video for a demo on implementing sliding window mechanism for a Hive partitioned table on HDInsight. All the scripts demonstrated in the video can be downloaded from here

This is a simple example of how you can use HCatalog polling that is available with the feature of HCatalog Integration on Apache Oozie 4.0 to solve a business need for data archival such as sliding window data partitioning. This is simple and easy to do starting with Version 3.0 HDInsight clusters.

Dharshana Bharadwaj (@dharshb)

Thank you to JasonH and Bill Carroll for reviewing this!

References

  1. Oozie Apache Wiki - https://oozie.apache.org/
  2. Oozie 4.0: HCatalog Integration Explained - https://oozie.apache.org/docs/4.0.0/DG_HCatalogIntegration.html
  3. Oozie Coordinator Functional Specification - https://oozie.apache.org/docs/4.0.0/CoordinatorFunctionalSpec.html
  4. Use time-based Oozie Coordinator with HDInsight - https://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-oozie-coordinator-time/