U-SQL Script with Python extension to detect Invalid input files

 

The script below validates each of input files in the folder and the python script splits and count the number of columns in each row of every. Those files that either have > or < than 9 columns in any of its rows are all logged as Invalid files.

 

REFERENCE ASSEMBLY [ExtPython];

 

DECLARE @ReduceScript = @"

def validate_rows(linetext, MaxCols):

return str((len(linetext.split(',')) - MaxCols))

 

def usqlml_main(df):

    MaxCols = 9

    df['noofcolumns'] = df.LineText.apply(validate_rows, args={MaxCols})

    del df['extension']

    del df['LineText']

 

    return df

";

 

@AllData =    EXTRACT     LineText string,

                filename string,

                extension string

        FROM         "/DataLoads/Input/{filename}.{extension}"

        USING         Extractors.Text(delimiter: '\n', skipFirstNRows: 1);

 

@AllData = SELECT LineText,filename, extension FROM @AllData;

 

@ReducedData =

            REDUCE         @AllData

            ON            filename

            PRODUCE        noofcolumns string,filename string

            USING         new Extension.Python.Reducer(pyScript:@ReduceScript);

 

@InvalidFiles =         SELECT DISTINCT filename

            FROM         @ReducedData

            WHERE         noofcolumns != "0";

@ValidFiles =

            SELECT DISTINCT filename

            FROM @ReducedData AS a

            LEFT ANTISEMIJOIN @InvalidFiles AS b ON a.filename == b.filename;

 

OUTPUT @InvalidFiles

TO "/DataLoads/InvalidFiles.txt"

USING Outputters.Text();

 

OUTPUT @ValidFiles

TO "/DataLoads/ValidFiles.txt"

USING Outputters.Text();

 

The EXTRACT Statement loads all files from the relative path "/DataLoads/Input" along with the filename attribute. The entire row is picked up as a single column and the column separator is assumed to be a comma (",").

The REDUCER groups the reduction by filename and then invokes the python script with the data frame. The python script calls the validate_row for each row with the row as a string. If the number of columns in the row is <9, the result will be a negative value and the result will be a positive value if the #of columns exceed 9. In essence, the result should be 0. The REDUCER produces the name of the file and the number of columns every row.

The files that do have noofcolumns <> 0 is all routed to an InvalidFile list.

Follow the instructions here to enable Python extension on the ADL-A account