“Cloud Numerics” Example: Using the IParallelReader Interface

To get your big-data applications to work, you need to read in the data –but what if your data is too big? What if the dataset you are reading requires more memory than is available on any single machine? This is a common constraint for all disciplines that handle large datasets. With “Cloud Numerics” you can read data, in parallel, across several machines such as several Windows Azure cluster nodes.

This post walks you through the “Cloud Numerics” IParallelReader interface, which is part of an input framework that facilitates the parallel loading of distributed arrays from storage.

The IParallelReader interface supports the following general workflow:

  1. The master process determines the partitioning of work for all of the worker processes based on some information, provided by the calling routine, about the dataset that needs to be loaded.
  2. Next, the master process and each worker process loads a portion of the distributed array based on the partitioning scheme you pre-determined. Each process returns a Local.NumericDenseArray.
  3. Finally, the Local.NumericDenseArrays created in the previous step are assembled to form the Distributed.DenseArray that is the output of the parallel reader.
Note!
The IParallelReader interface leverages the distributed runtime. For more details, please see this post.

 

This data input framework takes care of all communication and assembly, which means you only need to supply the relevant source code for the first two steps of the workflow and then define how the Distributed.NumericDenseArray is assembled. This required information is provided by an instance of a class that implements the IParallelReader<T> interface. It must contain the following:

  object [] ComputeAssignment(int totalLocales)
    This function is run only at the master process and it determines the partitioning of work across the other worker processes. It must return an array (let’s say the array is called “result”) of size totalLocales --where result[i] comprises the instructions that will be sent to rank i. Note! result must be an array of a serializable type.
  Numerics.Local.NumericDenseArray<T> ReadWorker(object assignment)
    This function runs at all ‘locales’ using the instructions provided by the ComputeAssignment call; rank i is called with result[i] as argument. This function performs the work of loading a portion of the stored object and returns a local NumericsDenseArray.
  An integer property called DistributedDimension
    DistributedDimension informs the framework how to assemble the local pieces into a final distributed array. For DistributedDimension = j the Local.NumericDenseArrays are assembled along the jth dimension to construct the Distributed.NumericDenseArray.

 

Once the required class is defined, it can be used by creating a new instance and calling the LoadData<T> method. For example:

 var reader = new CustomReader(<args>); 
var result = Microsoft.Numerics.Distributed.IO.Loader.LoadData<T>(reader);
Note!
Information that is common to all the ranks does not need to be placed in the result of ComputeAssignment. It can be stored in your IParallelReader<T> class; the class instance passed to LoadData is sent to all the processes by the framework. This happens before ComputeAssignment is called and so changes made by it to the instance are only seen by the master process.

 

We’ve provided a sample CSV file reader that implements the interface described above. The IParallelReader<T> class contains fields that hold the name of the file and other common information such as its length. ComputeAssignment uses the length information to create an array of (start, end) offset pairs that serve as instructions for the ranks. The ReadWorker method then has the file name (common to all the ranks) and its own (start, end) offset pair. With this information each rank opens the file, reads lines from the starting to the ending offset and returns the local array. Note that we have to be a little careful here for the cases where the starting or ending offset lands in the middle of a line. Finally, the resulting distributed array is created by the framework using the DistributedDimension property. The sample CSV file reader can be downloaded from the “Cloud Numerics” page on the Microsoft Connect site. To request access to the site, navigate to this link: sign-up here. Once you are registered for the Microsoft codename “Cloud Numerics” lab, you can download the software with this link: download here.

Here is an example that reads a set of files:

 using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.Numerics.Distributed;
using Microsoft.Numerics.Distributed.IO;
using Microsoft.Numerics.Local;

public class SampleReader : IParallelReader<double>
{
    private string[] fileNamesArray = null;  // Caching the list of file names
    private int distDim = 0;                        // Concatenate across the rows

    // Simple constructor – just store the file list
    public SampleReader(string[] inList)
    {
        fileNamesArray = inList;
    }

    // The required property for the distributed dimension
    public int DistributedDimension
    {
        get { return distDim; }
        set { distDim = value; }
    }

    // Parcel out sets of files to the ranks
    public object[] ComputeAssignment(int totalRanks)
    {
        int numFiles = fileNamesArray.Length;

        // How many files to give to each rank?
        int chunkSize = Math.Max(1, numFiles / totalRanks);

        string[][] result = new string[totalRanks][];

        // Go through the list of files and allocate chunkSize pieces
        int loc = 0;
       for (int i = 0; i < totalRanks; i++)
        {
            if (loc < numFiles)
            {
                // There are still files to process
                int num = 0;
                if (i == totalRanks - 1)
                {
                    // The last guy gets the rest
                    num = numFiles - loc;
                }
                else
                {
                    // Take as many as I can
                    num = Math.Min(chunkSize, numFiles - loc);
                }

                // Copy the names to the result array
                result[i] = new string[num];
                for (int j = 0; j < num; j++)
                {
                    result[i][j] = fileNamesArray[loc];
                    loc++;
                }
            }
            else
            {
                result[i] = new string[0];
            }
        }
        return (object[])result;
    }

    // Read your set of files to determine your contribution to the final result
    public Microsoft.Numerics.Local.NumericDenseArray<double> ReadWorker(object arg)
    {
        string[] myFiles = (string[])arg;

        if (myFiles.Length == 0)
        {
            return null;
        }

        // Read my files, accumulating them into result
        Microsoft.Numerics.Local.NumericDenseArray<double> myResult = null;

        for (int i = 0; i < myFiles.Length; i++)
        {
            // Have to write ReadSingleFile yourself
            var fileResult = ReadSingleFile(myFiles[i]);

            // Have to write the combine function yourself for now
            // We will soon have this function available
            myResult = Concatenate(myResult, fileResult, 0);
        }

        // Return my local piece and the framework will stitch them up
        return myResult;
    }

    // Combine two matrices by row
    private static Microsoft.Numerics.Local.NumericDenseArray<double> Concatenate(Microsoft.Numerics.Local.NumericDenseArray<double> mat1, Microsoft.Numerics.Local.NumericDenseArray<double> mat2, int dim)
    {
        // Ensure distributed dimension is 0
        if (dim != 0)
        {
            throw new ArgumentException("dim must be 0");
        }

        // If you pass in null, then just return the other argument
        if (mat1 as object == null)
        {
            return mat2;
        }

        if (mat2 as object == null)
        {
            return mat1;
        }

        int i, j;
        var shape1 = mat1.Shape.ToArray();
        var shape2 = mat2.Shape.ToArray();

        if (shape1.Length != 2)
        {
            throw new ArgumentException("mat1 must be two dimensional");
        }

        if (shape2.Length != 2)
        {
            throw new ArgumentException("mat2 must be two dimensional");
        }

        if (shape1[1] != shape2[1])
        {
            throw new ArgumentException("Both inputs must have the same number of columns");
        }

        var result = Microsoft.Numerics.Local.NumericDenseArrayFactory.Zeros<double>(shape1[0] + shape2[0], shape1[1]);

        // Copy the first
        for (i = 0; i < shape1[0]; i++)
        {
            for (j = 0; j < shape1[1]; j++)
            {
                result[i, j] = mat1[i, j];
            }
        }

        // Copy the second below the first
        for (i = 0; i < shape2[0]; i++)
        {
            for (j = 0; j < shape2[1]; j++)
            {
                result[shape1[0] + i, j] = mat2[i, j];
            }
        }

        return result;
    }

    private static Microsoft.Numerics.Local.NumericDenseArray<double> ReadSingleFile(string fileName)
    {
        // Here you read our file into a local array
        ...
    }
}