How to deal with files containing rows with different column counts in U-SQL: Introducing a Flexible Schema Extractor

If your input data is not rectangular in nature, but contains rows with different column counts, the built-in U-SQL extractors will not be able to parse the data. Instead, you should write a custom extractor.

In this blog post we will take a look at a flexible schema extractor that is able to read a row-oriented file that has different column counts by filling missing columns with null values. If you don't know the full schema for all rows a priori, it also allows you to fill a SqlMap column with the data that does not fit into the provided columns.

Example Scenario

Let's introduce a simple scenario with some sample data. Assume that the file /Samples/Blogs/MRys/FlexExtractor/OrderData.csv contains the order of different products where only the first 4 columns are fixed as order ID, product type, ordered amount and per item price. All remaining columns both in their number and semantics depend upon the product type:

 1,Shoes,2,99.99,10
1,Pants,3,59,34,32
2,Camera,1,999.00,Canon,70D
2,Lens,1,999.99,Canon,100mm.Macro,1.4
2,Lens,1,459.99,Sigma,28-85mm,Macro/Zoom,2.8
3,Camera,1,745,Sony,RX-100-II
3,Shoes,1,69.99,13

For example, the Camera product type has the camera make and model as additional columns, whereas the Lens product type provides additional columns providing the make, the focal length, lens type and the aperture setting.

Writing scripts to extract data

Let's write one script that does schematize all possible columns and then produces one file for each of the product types. A naive approach may create an extractor per product type (or a generic extractor that takes the product type as an argument), but that would mean that the file has to be read once for every product type. Obviously, you would want to avoid such costly reads if there is a lot of data with many product types. Instead, we will parse the file once and produce a single rowset that then will be filtered in subsequent SELECT statements.

Since the additional flexible columns are different across different product types, or not existing for some product types, the extract statement will handle the flexible columns in a generic way (as strings) and the "typing" SELECT statements will give the values their type specific semantics.

With this information, we can generate the following script:

 
DECLARE @input string = "/Samples/Blogs/MRys/FlexExtractor/OrderData.csv";

// Usage with all columns known
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
                c1 string, c2 string, c3 string, c4 string
        FROM @input
        USING new FlexibleSchemaExtractor.FlexExtractor();

// product type Camera
@cameras =
    SELECT orderid,
           orderamount,
           itemprice,
           c1 AS make,
           c2 AS model
    FROM @data
    WHERE producttype == "Camera";

OUTPUT @cameras
TO "/output/cameras.csv"
USING Outputters.Csv();

// product type Lens
@lenses =
    SELECT orderid,
           orderamount,
           itemprice,
           c1 AS make,
           c2 AS focallength,
           c3 AS lenstype,
           c4 == null ? (decimal?) null : Decimal.Parse(c4) AS aperture
    FROM @data
    WHERE producttype == "Lens";

OUTPUT @lenses
TO "/output/lenses.csv"
USING Outputters.Csv();

Note that the extract knows how to schematize the common columns, and then uses a generic naming and typing scheme for the rest, providing as many additional columns as the most complex product type provides.

Writing the Flexible Schema Extractor UDO

Now we need to implement the extractor:

 
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace FlexibleSchemaExtractor
{
    public class FlexExtractor : IExtractor
    {
        private Encoding _encoding;
        private byte[] _row_delim;
        private string _col_delim;

        public FlexExtractor(Encoding encoding = null, string row_delim = "\r\n", string col_delim = ",")
        {
            this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
            this._row_delim = this._encoding.GetBytes(row_delim);
            this._col_delim = col_delim;
        }

        public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
        {
            var colsInSchema = output.Schema.Count;

            // let's check global assumptions
            // - first 4 provided columns are int, string, int, decimal.

            if (   output.Schema[0].Type != typeof(System.Int32) 
                || output.Schema[1].Type != typeof(System.String)
                || output.Schema[2].Type != typeof(System.Int32)
                || output.Schema[3].Type != typeof(System.Decimal)
               )
            {
                throw new Exception("First 4 columns are not of expected types int32, string, int32, decimal.");
            }

            foreach (Stream currentline in input.Split(this._row_delim))
            {
                using (StreamReader lineReader = new StreamReader(currentline, this._encoding))
                {
                    string[] columns = lineReader.ReadToEnd().Split( new string[] { this._col_delim }
                                                                   , StringSplitOptions.None);
                    var colsInData = columns.Length;

                    // let's check row level assumptions
                    // - if less columns are specified, then last column needs to be of type SqlMap<Int32, string>

                    if (   colsInData > colsInSchema 
                        && output.Schema[colsInSchema - 1].Type != typeof(SqlMap<Int32, string>))
                    {
                        throw new Exception(
                              "Too many columns detected and last column is not of type SqlMap<Int32,string>. " 
                            + "Add a final column of type SqlMap<Int32,string&ht; into your extract schema.");
                    }
                    // Set first 4 fixed columns
                    output.Set<Int32>(0, Int32.Parse(columns[0]));
                    output.Set<String>(1, columns[1]);
                    output.Set<Int32>(2, Int32.Parse(columns[2]));
                    output.Set<Decimal>(3, Decimal.Parse(columns[3]));

                    // Fill all remaining columns except the last which may be a map
                    for (int i = 4; i < Math.Min(colsInData, colsInSchema) - 1; i++)
                    {
                        output.Set<String>(i, columns[i]);
                    }

                    // Now handle last column: if it is a map
                    if (   colsInData >= colsInSchema 
                        && output.Schema[colsInSchema - 1].Type == typeof(SqlMap<Int32,string>))
                    {
                        var sqlmap = new Dictionary<Int32,string>();
                        for (int j = colsInSchema - 1; j < colsInData; j++)
                        {
                            sqlmap.Add(j - colsInSchema + 1, columns[j]);
                        }
                        output.Set<SqlMap<Int32,string>>(colsInSchema - 1, new SqlMap<Int32,string>(sqlmap));
                    }
                    // Now handle last column: if it is not a map
                    else if (colsInData == Math.Min(colsInData, colsInSchema))
                    {
                        output.Set<string>(colsInData - 1, columns[colsInData - 1]);
                    }

                    yield return output.AsReadOnly();
                }
            }
        }
    }
}

As you can see, the extractor code is fairly simple. The only noteworthy special processing handles the last column that may be a SqlMap<Int32,string> if we do not know all the columns ahead of the query.

Extracting Flexible columns with a SqlMap

The extractor allows to use a final column of type SqlMap<Int32,string> in the following way (@input is defined as above):

 
@data = EXTRACT orderid int, producttype string, orderamount int, itemprice decimal,
                c1 string, map SqlMap<Int32,string>
        FROM @input
        USING new FlexibleSchemaExtractor.FlexExtractor();

// product type Camera
@cameras =
    SELECT orderid,
           orderamount,
           itemprice,
           c1 AS make,
           map[0] AS model
    FROM @data
    WHERE producttype == "Camera";

OUTPUT @cameras
TO "/output/cameras2.csv"
USING Outputters.Csv();

// product type Lens
@lenses =
    SELECT orderid,
           orderamount,
           itemprice,
           c1 AS make,
           map[0] AS focallength,
           map[1] AS lenstype,
           map[2] == null ? (decimal?) null : Decimal.Parse(map[2]) AS aperture
    FROM @data
    WHERE producttype == "Lens";

OUTPUT @lenses
TO "/output/lenses2.csv"
USING Outputters.Csv();

// remaining product types (serialize map generically)
@others =
    SELECT orderid,
           producttype,
           orderamount,
           itemprice,
           c1,
           map == null ? (string) null 
                       : string.Join(" ; ", 
                                     from p in map 
                                     select string.Format("{0}{1}{2}", p.Key, " : ", p.Value)) AS map 
    FROM @data
    WHERE producttype NOT IN ("Camera", "Lens");

OUTPUT @others
TO "/users/herring/others.csv"
USING Outputters.Csv();

Note that the @others expression is serializing the map with a C# expression so we can use the built-in outputter.

Conclusion

In this post I showed how easy it is to handle more complex file formats with flexible schema formats. You can find the sample data and the sample project on the Azure U-SQL Github repository.

Please leave your feedback and questions in the comments below.