Hadoop .Net HDFS File Access (Revisited Archived)

Updated post can be found here: https://blogs.msdn.com/b/carlnol/archive/2013/02/08/hdinsight-net-hdfs-file-access.aspx

Provided with the Microsoft Distribution of Hadoop, in addition to the C library, a Managed C++ solution for HDFS file access is provided. This solution enables one to consume HDFS files from within a .Net environment. The purpose of this post is first to ensure folks know about the new Windows HDFS Managed library (WinHdfsManaged), provided alongside the native C library, and secondly to give a few samples of its usage from C#.

Class Structure

Let’s start with a simple class diagram of the Win HDFS Managed library:

WinHdfsManagedModel

The main premise is that the HdfsFileSystem is your starting point, from which one can acquire a HdfsFileStream or a HdfsFileHandle. From the HdfsFileStream you can perform operations one would normally expect when working with .Net Streams. From the HdfsFileHandle you can perform operations analogous to normal HDFS file operations.

For brevity I have excluded samples using the HdfsFileHandle. So let’s run through some sample file operations.

Directory Operations

As in all operations one firstly needs to get a connection to the HDFS cluster. This is achieved by calling a Connect() method and specifying the host, name or IP address, and access port:

Create File System Access

  1. using (HdfsFileSystem hdfsSystem = HdfsFileSystem.Connect("127.0.0.1", 9000))
  2. {
  3.     ...
  4. }

Once one has the connection one can then easily perform a directory traversal to enquire into the files and directories:

List Directory Structure

  1. Action<string> processDirectory = null;
  2. processDirectory = (looppath) =>
  3. {
  4.     using (HdfsFileInfoEntries entries = hdfsSystem.ListDirectory(looppath))
  5.     {
  6.         foreach (HdfsFileInfoEntry entry in entries.Entries)
  7.         {
  8.             string kind = entry.Kind == HdfsFileInfoEntryKind.Directory ? "Directory" : "\tFile";
  9.             Console.WriteLine(string.Format(@"{0}:""{1}"", Modified/Accessed:""{2:G}, {3:G}"", Owner:""{4}""", kind, entry.Name, entry.LastModified, entry.LastAccessed, entry.Owner));
  10.             if (entry.Kind == HdfsFileInfoEntryKind.Directory)
  11.             {
  12.                 processDirectory(entry.Name);
  13.             }
  14.         }
  15.     }
  16. };
  17. processDirectory(hdfspath)

Here is a sample output created from the test application:

  1. Directory:"hdfs://127.0.0.1:9000/user/isotope/qwanchi", Modified/Accessed:"30/01/2012 20:46:38, 01/01/1970 00:00:00", Owner:"isotope"
  2.         File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/MobileSampleData.txt", Modified/Accessed:"30/01/2012 20:46:38, 30/01/2012 20:46:38", Owner:"isotope"
  3. Directory:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/duplicate", Modified/Accessed:"30/01/2012 20:46:38, 01/01/1970 00:00:00", Owner:"isotope"
  4.         File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/duplicate/testdata.txt", Modified/Accessed:"30/01/2012 20:46:38, 30/01/2012 20:46:38", Owner:"isotope"
  5.         File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/testdata.txt", Modified/Accessed:"28/01/2012 20:46:38, 29/01/2012 20:46:38", Owner:"isotope"

In addition to getting directory information one can also query on a file or directory directly: 

Get Path Information

  1. hdfsSystem.SetWorkingDirectory(hdfspath);
  2. using (HdfsFileInfoEntry pathinfo = hdfsSystem.GetPathInfo(hdfspath))
  3. {
  4.     if (pathinfo != null)
  5.     {
  6.         string kind = pathinfo.Kind == HdfsFileInfoEntryKind.Directory ? "Directory" : "\tFile";
  7.         Console.WriteLine(string.Format(@"{0}:""{1}"", Modified/Accessed:""{2:G}, {3:G}"", Owner:""{4}""", kind, pathinfo.Name, pathinfo.LastModified, pathinfo.LastAccessed, pathinfo.Owner));
  8.     }
  9. }

The HdfsFileSystem class also supports other operations such as copying and moving files, file renaming, deleting files, modifying security, checking a file exists, etc. The copy and move operations support copying and moving these files between systems.

So now onto creating and reading files.

Reading Files

Processing HDFS files is not that dissimilar from normal .Net file operations. Once one has opened a file for reading, operations are available for operations such as reading a byte, line, or block of bytes:

Reading Stream File Data

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Write, chunksize))
  2. {
  3.     file.Write(dataBytes, 0, data.Length);
  4.     file.WriteByte((byte)47);
  5.     file.Flush();
  6. }

The OpenFile operations support parameter overrides for the file block size and replication factors, whereas a value of zero implies the default values will be used.

If one wants to read the full contents of a file into a second Stream, the HdfsFileStream makes this a simple process:

Reading a File by Stream

  1. using (HdfsFileStream hdfsStream = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Read))
  2. {
  3.     using (FileStream fileStream = new FileStream(localfilestream, FileMode.Create, FileAccess.Write))
  4.     {
  5.         hdfsStream.CopyTo(fileStream);
  6.     }
  7. }

There are other options available for reading the full contents of a file. The first option is to perform a ReadLine() until a null is returned, processed using a StreamReader:

Writing a HDFS to Local Stream

  1. using (StreamReader reader = new StreamReader(hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Read, chunksize)))
  2. {
  3.     using (StreamWriter writer = new StreamWriter(localfileline, false, Encoding.UTF8))
  4.     {
  5.         string line;
  6.         while ((line = reader.ReadLine()) != null)
  7.         {
  8.             writer.WriteLine(line);
  9.         }
  10.     }
  11. }

Alternatively, for more efficient reading of files, one can read the blocks of data into a byte array: 

Reading a File in Bytes

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Read))
  2. {
  3.     while ((chunk = file.Read(readBytes, 0, chunksize)) > 0)
  4.     {
  5.         Console.Write(Encoding.UTF8.GetString(readBytes, 0, chunk));
  6.     }
  7. }

Other operations that are supported are PositionalReadByte(), PositionalReadBytes(), and Seek(). These operations allow reading the contents of a file from specific positions.

One final sample worth noting is copying a HDFS file to a local file using byte reads:

Writing a HDFS to Local File

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Read, chunksize))
  2. {
  3.     using (FileStream stream = new FileStream(localfilewrite, FileMode.Create, FileAccess.Write))
  4.     {
  5.         while ((chunk = file.Read(readBytes, 0, chunksize)) > 0)
  6.         {
  7.             stream.Write(readBytes, 0, chunk);
  8.         }
  9.     }
  10. }

The reason a chunk size is specified in this case is to sync the size being used for HDFS file access to the byte array used for writing the local file.

If one has a Stream reference one can also get the associated file information:

Get File Information

  1. HdfsFileInfoEntry fileinfo = file.GetInformation();
  2. if (fileinfo != null)
  3. {
  4.     Console.WriteLine(string.Format(@"'{0}', Modified/Accessed:""{1:G}, {2:G}"", Owner:""{3}""", fileinfo.Name, fileinfo.LastModified, fileinfo.LastAccessed, fileinfo.Owner));
  5. }

Also one can modify the file properties:

Modifying File Properties

  1. file.Chown("isotope", null);
  2. file.SetTimes(DateTime.Now.AddDays(-2), DateTime.Now.AddDays(-1));

So now onto writing files.

Writing Files

As in the case for reading, writing operations are supported for writing a byte, line, and block of bytes:

Writing File Stream Data

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Write, chunksize))
  2. {
  3.     file.Write(dataBytes, 0, data.Length);
  4.     file.WriteByte((byte)47);
  5.     file.Flush();
  6. }

The chunk size when opening a file is set to correspond to the size of the buffer used for writing the data.

As in the reading case, if one wants to copy a file from the local file system to an HDFS file one would write:

Writing a Local to HDFS Stream

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Write, chunksize))
  2. {
  3.     using (FileStream stream = new FileStream(localfilepath, FileMode.Open, FileAccess.Read))
  4.     {
  5.         while ((chunk = stream.Read(localbytes, 0, chunksize)) > 0)
  6.         {
  7.             file.Write(localbytes, 0, chunk);
  8.         }
  9.     }
  10.  
  11.     file.Flush();
  12. }

All one has to do is read, in byte chunks, data from the local file and write the corresponding bytes to the HDFS file.

Of course one can also use the CopyTo operation:

CopyTo Local to HDFS Stream

  1. using (HdfsFileStream file = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Write, chunksize))
  2. {
  3.     using (FileStream stream = new FileStream(localfilepath, FileMode.Open, FileAccess.Read))
  4.     {
  5.         stream.CopyTo(file);
  6.     }
  7. }

A quick word is warranted on appending to a file. Although the API currently supports open files for Append, this is only supported in Hadoop version 1.0.0 and above.

Building the Library

The code for the managed and unmanaged libraries for HDFS file access can be found in the folder:

C:\Apps\dist\contrib\WinLibHdfs

The download not only consists of the compiled libraries but also the full source code and sample C# application that this post is based upon. You can compile the source or just use the delivered assemblies. The source supports both x86 and x64 compilations. However one has to remember that if one does a 32-bit compilation then a 32-bit version of the JRE will also be required.

One final word is warranted about environment variables. As the C library being used by the Managed wrapper is actually calling Java code, one needs to define some additional directories in the Path and CLASSPATH environment variables. A recent addition to the code no manages this for the process. Before a connection is made through the HdfsFileSystem, the Hadoop installation and JRE path are located and the Path and CLASSPATH are adjusted accordingly.