Slicing and Dicing: Configuring External Data Input to Azure Data Factory

By Stephen Lewontin, Solution Architect

One of the most common Azure Data Factory Scenarios is to ingest externally generated data into an ADF pipeline. We want the pipeline to pick up the latest imports and begin to process them, also at regular intervals. We don't want ADF to process data more than once, and we don't want ADF to miss any data.

There's lots of detailed documentation on how to set up Azure Data Factory pipelines for a variety of use cases, but while working on the Twitter project I blogged about a few weeks ago, and in subsequent conversations with colleagues, I noticed that the subject of how external data gets scheduled into an ADF pipeline remains a source of confusion. So this week's posting is a mini tutorial on this topic. I'll cover two basic scenarios: copying data from Azure Blob Storage and from Azure SQL.

ADF supports these scenarios by implementing data slicing: a way to divide up chunks of data to be processed based on defined time intervals. You typically configure this by setting up the external data generation process to time-stamp the data as it is stored, and then creating an ADF configuration that uses the slice time to identify the time-stamped data that each slice should pick up.

Copying Data from Azure Blob Storage

Here we'll show how this works beginning with a very simple example: data are regularly dropped into blob storage using blob names that include a date-time "stamp". The ADF pipeline is then configured with a template that filters the blob names to be processed, based on the slice time.

As I said, there's already excellent documentation on this topic. If you want to try this out, I'd suggest starting with excellent Move data to and from Azure Blob using Azure Data Factory. I'm not going to cover all of the steps outlined there: I'm just going to focus on the details of the external data configuration steps.

The key element here is the input data source .JSON configuration file, which describes both the format of the input data blobs and the template used to filter blob names based on slice time. Here's an example, from the Twitter project, but very similar to the one in the documentation mentioned above.

{

"name": "RawTweetTable",

"properties": {

"structure": [

{

"name": "tweet_id",

"type": "String"

},

{

"name": "screen_name",

"type": "String"

},

{

"name": "tweet_text",

"type": "String"

}

],

"type": "AzureBlob",

"linkedServiceName": "RawTweetBlobLS",

"typeProperties": {

"folderPath": "raw-tweets/{Year}/{Month}/{Day}/{Hour}",

"partitionedBy": [

{

"name": "Year",

"value": {

"type": "DateTime",

"date": "SliceStart",

"format": "yyyy"

}

},

{

"name": "Month",

"value": {

"type": "DateTime",

"date": "SliceStart",

"format": "%M"

}

},

{

"name": "Day",

"value": {

"type": "DateTime",

"date": "SliceStart",

"format": "%d"

}

},

{

"name": "Hour",

"value": {

"type": "DateTime",

"date": "SliceStart",

"format": "%H"

}

}

],

"format": {

"type": "TextFormat",

"columnDelimiter": ","

}

},

"external": true,

"availability": {

"frequency": "Hour",

"interval": 1

},

"policy": {

"externalData": {

"retryInterval": "00:01:00",

"retryTimeout": "00:10:00",

"maximumRetry": 3

}

}

}

}

This specifies a template for finding and retrieving input data tables at regular intervals. The first section describes the table format: three columns of comma-separated text, stored in Azure Blob. The policy element specifies that this table is external: i.e. that the data is provided by some process outside of the ADF pipeline. In the Twitter project, this was provided by a Web role that updates a Twitter feed from the Twitter REST API on an hourly basis. The availability element tells the ADF runtime how often to expect new slice data to be available: in this case, once per hour. It makes sense to set availability to correspond to the expected timing when the external process will actually drop data into the blob store: otherwise the copy activity may execute unnecessarily looking for data that is not, in fact, available.

The remaining elements specify the criteria that the ADF runtime uses to select blob objects for each slice. The logic is as follows:

  1. The external process that populates the blob store with data objects organizes them into hourly time-stamped "folders". For example, a set of 3 objects that correspond to an October 10, 2015 2PM slice could be named as follows:

     

    raw-tweets/2015/10/6/14/01.csv

    raw-tweets/2015/10/6/14/02.csv

    raw-tweets/2015/10/6/14/03.csv

     

    In practice, blob storage is organized as a flat set of object names in top-level containers, so what looks like a folder pathname here (raw-tweets/2015/10/6/14/ ) is actually composed of the container name, "raw-tweets", and the first several elements of blob object names for three data objects within that container. What look like the filenames here (for example, 01.csv ) are actually taken from the last element of each blob object name. For example, the blob name of the first object is 2015/10/6/14/01.csv.

     

    In the partitioning scheme we're going to use here, the "filename" part of the blob object name is ignored, so you can use any unique naming scheme you choose for objects that get dropped into the blob store during a specific time slice. Here I've just use sequential numbers for the "filenames". We'll address partitioning schemes that include a template specification for the "filename" below.

     

  2. The folderPath element of the .JSON gives the template for matching container/blob object names for each slice. The terms in brackets— {Year} , {Hour} , etc.—are placeholders for values supplied by the ADF copy activity when it is processing the slices.

     

  3. The values that actually get filled into the placeholders are specified in the partitionedBy element:

     

    1. The values are set from a runtime variable called SliceStart. The ADF copy activity sets the value of this variable when it starts to process a slice.
    2. The "format" property specifies the pattern used to map the SliceStart time to each template element of the folder path. For example, the "%d" format maps the day of the month in the SliceStart time to one or two digits in the folder path. So, for a slice that starts on October 9, 2015, the {Day} placeholder in folderPath is filled in with "9". On October 18th, this would be filled with "18". For details see the .NET Custom Date and Time Format Strings documentation.
    3. As noted above, the .JSON configuration does not explicitly specify a template for the last component of the blob object name (e.g. 01.csv, 02.csv, etc.) The default behavior is to match all objects with names that match the specified template. But it is possible to have the partitioning scheme match the "filename" part of the blob object name too. I've avoided this here to keep things as simple as possible: one advantage of using the default behavior is that the external input process can use any unique names it chooses for the individual slices without having to worry about matching rules for the slice names. However, you may find it useful to design your partitioning scheme to include a template for the filename, using the fileName element in addition to the folderPath element in the configuration JSON, as in the example provided here.

Based on the above logic, the folder path will match all of the files listed above when the slice start time is October 6, 2015 2PM.

One thing you will note if you actually run the sample: the SliceStart times don't directly correspond the actual times when a slice is processed. By default, the SliceStart is set to the end of the availability interval, on the assumption that the external source could write data at any time during the interval. In the above example, the slices with slice start of 6PM and slice end of 7PM will typically be run sometime shortly after 7PM. This is useful to remember when debugging slices.

It is also worth noting that there's an implicit assumption that the "time-stamped" path names in fact correspond to meaningful time periods for the blob object contents. There's nothing in the blob storage logic that can enforce this: it is entirely up to the external process that writes blob data to enforce the correct time-stamp semantics. Also, see a similar comment below about "schema" enforcement for blob data.

Another note of interest to readers familiar with Apache Hive: you may notice the similarity between the way ADF partitions the blob namespace and Apache Hive partitioning.

Input from Azure SQL

Another common scenario, also well documented, is to input ADF pipeline data from Azure SQL. Here we'll look briefly at the differences from the blob input case. A slightly modified version of the input dataset configuration from the published example is:

{

"name": "RawTweetSqlInput",

"properties": {

"type": "AzureSqlTable",

"linkedServiceName": "RawTweetSqlLS",

"typeProperties": {

"tableName": "raw_tweet_table"

},

"external": true,

"availability": {

"frequency": "Hour",

"interval": 1

},

"policy": {

"externalData": {

"retryInterval": "00:01:00",

"retryTimeout": "00:10:00",

"maximumRetry": 3

}

}

}

}

 

This is much simpler than the blob input configuration from above. Specifically, the typeProperties element, which in the blob case contained a detailed template of the storage "folder" schema, is in this case just the SQL table name:

"typeProperties": {

"tableName": "raw_tweet_table"

}

 

The structure element used in the blob example, which describes the input data schema, is also missing. This makes sense because the SQL data schema should be owned by the database, not the ADF pipeline that consumes the data. In fact, this element is also optional in the blob storage JSON, and is most useful when performing data transformations during the copy operation, as discussed here, but I would recommend keeping it anyway since it provides a useful way to validate the "folder" schema used by ADF for the blob copy activity.

For the SQL case we still need to timestamp the input data so that the ADF copy activity can identify which items to process for each slice. Here, the input items are database rows rather than blob objects with path-like time-based names. We accomplish this by including a time stamp column in the SQL table schema and ensuring that this contains the current time when each row is written.

Here's a fragment of the SQL query used in the Twitter project to create the raw tweets database:

CREATE TABLE [dbo].[raw_tweet_hourly] (

 

. . .

 

[time_stamp] DATETIME NOT NULL,

[tweet_id] NCHAR (20) NOT NULL,

[screen_name] NVARCHAR (50) NOT NULL,

[text] NVARCHAR (256) NULL,

 

. . .

);

And here's a snippet of .NET C# code from the Twitter Web role implementation that formats each row to write to the raw tweet table:

string row = string.Format(CultureInfo.InvariantCulture,

"\'{0:yyyyMMdd HH:mm:ss}\', \'{1}\', \'{2}\', \'{3}\'",

    DateTime.Now, tweet_id, screen_name, escapedTweetText);

 

The date time format string, {0:yyyyMMdd HH:mm:ss} , gets the .Net DateTime value into a proper SQL datetime format that can be matched by the ADF copy activity. In the Twitter project, the Web role is coded in C#, but of course one can use any of the supported .NET languages to do the same thing.

Now we just need a way to make the time stamps visible to the ADF copy activity that will process the slices. In the SQL case, this happens in the pipeline configuration JSON rather than in the input dataset configuration. I won't show the complete JSON pipeline configuration here since it is virtually identical to the configuration given in the link above. The key thing to notice is the source element for the input SQL source:

"source": {

"type": "SqlSource",

"SqlReaderQuery":

"$$Text.Format(

'select * from raw_tweet_table where time_stamp >= \\'{0:yyyy-MM-dd HH:mm:ss}\\' AND time_stamp < \\'{1:yyyy-MM-dd HH:mm:ss}\\'',

WindowStart, WindowEnd)"

}

 

The SqlReaderQuery elements specifies a T-SQL query that will pick up rows with timestamps corresponding to the time period between the values returned by the ADF runtime variables WindowStart and WindowEnd. These variables are similar to SliceStart and SliceEnd variables we saw above, but in this case they correspond to the scheduled activity run start and end times, as specified by the scheduler element in the pipeline JSON:

"scheduler": {

"frequency": "Hour",

"interval": 1

}

 

 

By default these variables will map to the same timestamps in a table row as the corresponding blob variables map to in a blob object "pathname" (though this can be changed by setting specific properties in either the input JSON availability element or the pipeline JSON scheduler element.)

The result is that the slicing behavior here is quite similar to the behavior in the blob input case: the copy activity can pick up multiple rows with timestamps that fall within the defined window, just as the blob copy activity can pick up multiple blob objects within the same hourly "folder".

One important note: when designing a database that holds time-sliced input data, is important to consider how well datetime- based queries like the one shown above will scale for the very large data sets typical in ADF scenarios. You should consider implementing an indexing and partitioning strategy that will perform well with your anticipated data stream. Typically this means treating the datetime column as a partitioning column and creating appropriate indexes. For a detailed discussion on this topic, see the MSDN Partitioned Tables and Indexes documentation.

Details

There's plenty of documentation for Azure Data Factory pipeline configuration, enough that it can sometimes be difficult to figure out where to find the details you need to implement a specific use case. Here are some pointers to detailed documentation, which you may find useful when building out an implementation: