Something's Brewing with Azure Data Factory - Part 3

In the first two parts of this blog series (HERE and HERE), we used Azure Data Factory to load Beer review data from an Azure SQL Database to an Azure Blob Storage account. We then processed that data using HDInsight and the Mahout Machine Learning Library to generate user-based recommendations. In this final post, we will take those recommendations and massage them back into our Azure SQL Database where they can be consumed by users.

The Set-Up

The output of the Mahout MapReduce job is a file of recommendations in a pivoted format. The format of the file is as follows which each user represented on a single line:

 {USERID}    [{BEERID}:{ESTIMATEDRATING},{BEERID}:{ESTIMATEDRATING},{BEERID}:{ESTIMATEDRATING},....]

In order to make use of these recommendations we need to unpivot the data such that each user/beer recommendation is on a single line in comma-separated value (CSV) format. To do this we will use a couple of iterations of Hive queries to transform and filter the recommendations before loading.

Creating the Hive Job

The transformation of the Mahout generated recommendations into a CSV file occurs in four steps. I have documented the steps below but will leave the detail explanation of how Hive queries work for some other time.

Step 1 - Create a Hive table over-top of the Mahout output
 DROP TABLE IF EXISTS RecommendationsRaw;
CREATE EXTERNAL TABLE RecommendationsRaw (
    USERID INT,
    RECOMMENDATIONS STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
LOCATION '${hiveconf:MahoutInput}';
Step 2 - Strip the square-brackets (‘[‘ and ‘]’) and load data into a clean staging table
 DROP TABLE IF EXISTS RecommendationsStaged;
CREATE EXTERNAL TABLE RecommendationsStaged (
    USERID INT,
    RECOMMENDATIONS STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
LOCATION '${hiveconf:MahoutStaging}';

INSERT OVERWRITE TABLE RecommendationsStaged
SELECT
    USERID,
    REGEXP_REPLACE(
        REGEXP_REPLACE(RECOMMENDATIONS, 
                "\\[", ""), 
            "\\]", "") AS RECOMMENDATIONS
FROM RecommendationsRaw;
Step 3 – Create a new Hive table over leverage Hive’s support for complex types (arrays and stucts)
 DROP TABLE IF EXISTS MahoutRecommendations;
CREATE EXTERNAL TABLE MahoutRecommendations (
    USERID INT,
    RECOMMENDATIONS ARRAY<STRUCT<BEERID:STRING,RATING:STRING>>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n'
LOCATION '${hiveconf:MahoutStaging}';
Step 4 – Unpivot the data and write to a CSV formatted file
 DROP TABLE IF EXISTS Recommendations;
CREATE EXTERNAL TABLE Recommendations (
    USERID INT,
    BEERID INT,
    RATING STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION '${hiveconf:RecommendationsOutput}';

INSERT OVERWRITE TABLE Recommendations
SELECT
   USERID,
   r.BEERID,
   r.RATING
FROM MahoutRecommendations
LATERAL VIEW explode(RECOMMENDATIONS) RECOMMENDATION_TABLE AS r;

Taking all four pieces of the script, we assemble them into a single file with an *.hql extension. The file should then be uploaded your blob storage account. The Azure Data Factory pipeline will reference this script based on the configuration described in my previous blog post.

Creating the SQL Server Table Type & Procedures

To efficiently load data back into our Azure SQL Database we use the definition of a custom table type and a stored procedure. Our recommendation data is then passed as a table variable to the stored procedure which is ultimately responsible for handling the heavy lifting and performing any last transformations required to load the data into the database table structures. The table-type and stored procedure DDL are listed below and will be reference later when we specify our Azure Data factory pipeline.

 CREATE TYPE [dbo].[RecommendationType] AS TABLE(
    [UserID] [int] NULL,
    [BeerID] [int] NULL,
    [EstimatedScore] [decimal](18, 2) NULL
)
 CREATE PROCEDURE [dbo].[IngressUserRecommendations]
    @Recommendations dbo.RecommendationType READONLY
AS
    BEGIN TRY
        BEGIN TRAN
            DELETE FROM Recommendation

            INSERT Recommendation (UserID, BeerID, EstimatedScore)
            SELECT
                UserID, BeerID, EstimatedScore
            FROM @Recommendations
        COMMIT TRAN
    END TRY
    BEGIN CATCH
        IF @@TRANCOUNT > 0
        BEGIN
            ROLLBACK TRAN
        END
    END CATCH
GO

Wiring in the Azure Data Factory Pipeline

Recall from my prior post that we defined a MahoutRecommendations dataset that acts as the input into this new pipeline. Before we create the pipeline however, we need to define two new datasets. The first dataset represents the output generated by the fourth step of the Hive job in out Azure Blob Storage account. For this dataset, we need to define a schema structure since it will be used to map columns and load the data back into our Azure SQL database. The JSON configuration is provided below.

 {
    "name": "Recommendations",
    "properties":
    {
        "structure":  
        [ 
            { "name": "UserID", "type": "INT"},
            { "name": "BeerID", "type": "INT"},
            { "name": "EstimatedScore", "type": "STRING"}
        ],
        "location": 
        {
            "type": "AzureBlobLocation",
            "folderPath": "dfdemo/recommendations/{Year}_{Month}_{Day}/",
            "format":
            {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
            "linkedServiceName": "StorageLinkedService",
            "partitionedBy":
            [
                { name: "Year", value: { type: "DateTime", date: "SliceStart", format: "yyyy" } },
                { name: "Month", value: { type: "DateTime", date: "SliceStart", format: "%M" } },
                { name: "Day", value: { type: "DateTime", date: "SliceStart", format: "%d" } }
            ]
        },
        "availability": 
        {
            "frequency": "Day",
            "interval": 1            
        }
    }
}

The second dataset acts as our connection back to the Azure SQL Database. Note in the JSON below, I went ahead and specified the Recommendations table in the dataset, but that will not be used by our pipeline.

 {
    "name": "RecommendationsSQL",
    "properties":
    {
        "location":
        {
            "type": "AzureSqlTableLocation",
            "tableName": "Recommendations",
            "linkedServiceName": "AzureSqlLinkedService"
        },
        "availability": 
        {
            "frequency": "Day",
            "interval": 1            
        }
    }
}

Finally, we define the pipeline. This pipeline use the MahoutRecommendations dataset as a input and consists of two activities: a HDInsight Activity and a Copy Activity. The HDInsight Activity references the HDInsight Linked Service defined in the previous blog entry and uses the Hive transformation to execute the *.hql script we defined above. The extended properties node is used to pass the various Hive configuration properties required by the script. The Copy Activity handles the egress of data from the Hive transformation back to the Azure SQL database. In this configuration we reference our two new datasets and configure the source and sinks for the movement. In the sink definition we reference the new SQL Table Type and Stored Procedure created above. The entire JSON configuration winds up looking as follows:

 {
    "name": "EgressPipeline",
    "properties":
    {
        "description" : "Unpivot the Mahout Recommendation output and load to Azure SQL Database",
        "activities":
        [
            {
                "name": "Unpivot",
                "description": "Unpivot Mahout Recommendation output",
                "type": "HDInsightActivity",
                "inputs": [ {"name": "MahoutRecommendations"}],
                "outputs": [ {"name": "Recommendations"} ],
                "linkedServiceName": "HDILinkedService",
                "transformation":
                {
                    "type": "Hive",
                    "extendedProperties":
                    {
                        "MahoutInput": "$$Text.Format(
                            'wasb://<CONTAINER>@<ACCOUNT>.blob.core.windows.net/mahout/recommendations/{0:yyyy_MM_dd}/', 
                                Time.AddHours(SliceStart, 0))",
                        "MahoutStaging": "$$Text.Format(
                            'wasb://<CONTAINER>@<ACCOUNT>.blob.core.windows.net/mahout/stage/{0:yyyy_MM_dd}/', 
                                Time.AddHours(SliceStart, 0))",
                        "RecommendationsOutput": "$$Text.Format(
                            'wasb://<CONTAINER>@<ACCOUNT>.blob.core.windows.net/recommendations/{0:yyyy_MM_dd}/', 
                                Time.AddHours(SliceStart, 0))"
                    },        
                    "scriptpath": "dfdemo\\scripts\\unpivotrecommendations.hql",                
                    "scriptLinkedService": "StorageLinkedService"
                },
                "policy":
                {
                    "concurrency": 1,
                    "executionPriorityOrder": "OldestFirst",
                    "retry": 1,
                    "timeout": "01:00:00"
                }
            },
            {
                "name": "EgressData",
                "description": "Egress recommendations to Azure SQL DB",        
                "type": "CopyActivity",
                "inputs": [ {"name": "Recommendations"} ],
                "outputs": [ {"name": "RecommendationsSQL"} ],    
                "transformation":
                {
                    "source":
                    {                               
                        "type": "BlobSource"
                    },
                    "sink":
                    {
                        "type": "SqlSink",
                        "SqlWriterTableType": "RecommendationType",
                        "SqlWriterStoredProcedureName": "IngressUserRecommendations"
                    }            
                },
                "Policy":
                {
                    "concurrency": 1,
                    "executionPriorityOrder": "OldestFirst",
                    "style": "StartOfInterval",
                    "retry": 0,
                    "timeout": "01:00:00"
                }
            }
        ]
      }
}

As in the previous post, to update the data factory with all the new pieces we use PowerShell.

Wrap-Up

Using the Azure Portal, you can browse the completed Data Factory using the diagram view (see below) to see the final result. When the pipeline runs it will create a HDInsight Cluster on-demand (if one does not exist from a prior step), execute the Hive script transformation and then load the data to the Azure SQL Database.

image

 

This example used multiple pieces of the new Azure Data Factory service but has only scratched the surface of its overall capabilities. Keep in mind that this service is still in Preview and will evolve. This evolution may change some of the current capabilities discussed and will certainly introduce new capabilities. I hope these posts sparks your interest in this new technology and moves you to give it a shot in your own use case. As always feel free to let me know if you have any questions or suggestions. Till next time!

Chris