Creating FTP data movement activity for Azure Data Factory Pipeline

Recently I participated in a hackathon with one of my customers to help them get up to speed on Azure's Big Data and Advanced Analytics platform, especially around data movement using Azure Data Factory. One of their requirement was to process data stored at an FTP location. Since FTP is not a supported data store (for now), we created a custom activity to download data from the FTP site and upload it to a blob storage for processing. Here is a quick walkthrough to create, test and deploy the FTP custom activity using Visual Studio.

High Level Steps

  1. Prerequisites
    • Data Factory
    • Storage Account to store FTP Data and Custom Activity Code.
    • Batch Account and Pool to execute Custom Activity Code.
  2. Setup Azure KeyVault to manage FTP credentials .
  3. Create FTP Custom Activity.
  4. Enable Local Debugging and Testing.
  5. Create Data Factory Pipeline using Visual Studio
    1. Linked Services
      • Storage Service
      • Batch Service
    2. Tables
      • Output Dataset
    3. Pipeline
      • Custom Pipeline
    4. Add Custom Activity reference in Pipeline Project for automatic packaging and deployment
  6. Publish and Test

Prerequisites

- Create a Data Factory and Storage Account using Azure Portal

adf-create storage-ac

- Connect to your storage account using Storage Explorer or Cloud Explorer in Visual Studio and create 2 containers:
1) "ftpactivitycode" - To store the custom activity DLL
2) "ftpdata" - To store the files coming from FTP.
- Create Batch Account and Pool to execute Custom Activity Code (Follow Azure Batch Prerequisites instructions from here)

batch-1

Setup Azure KeyVault to manage FTP credentials

- Use Powershell to create a new Azure KeyVault and store the FTP Password as a Secret
 #Login to Azure and select your subscription
Add-AzureRmAccount
Select-AzureRmSubscription -SubscriptionName "[You Subscripton Name]"
 
# Create new KeyVault
New-AzureRmKeyVault -VaultName 'FTPVault' -ResourceGroupName 'FTPDataFactory' -Location 'West US'  
   
# Add a secret in KeyVault
$secretvalue = ConvertTo-SecureString '[Your FTP Password]' -AsPlainText -Force
$key = Set-AzureKeyVaultSecret -VaultName 'FTPVault' -Name 'FTPPassword' -SecretValue $secretvalue
   
# Get the Secret Id
Get-AzureKeyVaultSecret –VaultName 'FTPVault'
 - Register a new application with Azure Active Directory and copy the Client ID and Key

 

- Set Access Policy on KeyVault to allow application to use the Secret.
 Set-AzureRmKeyVaultAccessPolicy -VaultName 'FTPVault' -ServicePrincipalName [Your Client ID here] -PermissionsToSecrets get

Create FTP Custom Activity

- Create a new .Net 4.5 Class Library project in Visual Studio

vs

- Add Nuget Packages using Package Manager Console
 Install-Package Microsoft.Azure.Management.DataFactories
 Install-Package WindowsAzure.Storage
 Install-Package Microsoft.Azure.KeyVault
 Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
- Create KeyVaultHelper class:
 using Microsoft.Azure.KeyVault;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ADFCustomActivities
{
    public class KeyVaultHelper
    {
        string appId;
        string appKey;
        public KeyVaultHelper(string clientId, string clientSecret)
        {
            appId = clientId;
            appKey = clientSecret;
        }

        public string GetSecret(string secretId)
        {
            var keyClient = new KeyVaultClient(new KeyVaultClient.AuthenticationCallback(GetAccessToken));
            return keyClient.GetSecretAsync(secretId).Result.Value.ToString();
        }

        private async Task<string> GetAccessToken(string authority, string resource, string scope)
        {
            var adCredential = new ClientCredential(appId, appKey);
            var authenticationContext = new AuthenticationContext(authority, null);
            var result = await authenticationContext.AcquireTokenAsync(resource, adCredential);
            return result.AccessToken;
        }
    }
}
- Create FTPActivity class:
 using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.Azure.Management.DataFactories.Runtime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;

namespace ADFCustomActivities
{
    public class FTPActivity : IDotNetActivity
    {
        string ftpFileUrlKey = "FTPFileUrl";
        string clientId = "[Your Client Id]";  //application client id
        string clientSecret = "[Your Client Secret]";  //application key
        string keyVaultSecretId = "https://ftpvault.vault.azure.net:443/secrets/FTPPassword";
        string ftpUserName = "[Your FTP Username]";

        public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, 
                          IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
        {
            //Get FTP URL from extended properties    
            var currentActivity = (DotNetActivity)activity.TypeProperties;
            var extendedProperties = currentActivity.ExtendedProperties;
            if (!extendedProperties.Any(k => k.Key == ftpFileUrlKey))
            {
                logger.Write("ERROR: FTPUrl not found in extended properties.");
                return new Dictionary<string, string>();
            }
            var ftpFileUrl = extendedProperties[ftpFileUrlKey];

       //Get FTP Password from KeyVault
            var ftpPassword = new KeyVaultHelper(clientId, clientSecret).GetSecret(keyVaultSecretId);
           
        //Get blob connection string from Output dataset
            var outputDataset = datasets.Single(dataset => dataset.Name == activity.Outputs.Single().Name);
            var outputLinkedService = linkedServices.Last(linkedService => 
                                      linkedService.Name == outputDataset.Properties.LinkedServiceName)
                                      .Properties
                                      .TypeProperties as AzureStorageLinkedService;
            var blobConnectionString = outputLinkedService.ConnectionString;

     //Get blob folder from Output dataset
            var outputLocation = outputDataset.Properties.TypeProperties as AzureBlobDataset;
            var folderPath = outputLocation.FolderPath;

       //Upload file to blob
            UploadFileToBlob(ftpFileUrl, ftpUserName, ftpPassword, blobConnectionString, folderPath, logger);

      return new Dictionary<string, string>();
        }

 public void UploadFileToBlob(string ftpFileUrl, string ftpUserName, string ftpPassword, 
                            string blobConnectionString, string blobFolderPath, IActivityLogger logger)
        {
            
            var request = (FtpWebRequest)WebRequest.Create(new Uri(ftpFileUrl));
            request.Credentials = new System.Net.NetworkCredential(ftpUserName, ftpPassword);
            request.UseBinary = true;
            request.UsePassive = true;
            request.KeepAlive = true;
            request.Method = WebRequestMethods.Ftp.DownloadFile;
            var outputValue = string.Empty;
            using (var ftpResponse = (FtpWebResponse)request.GetResponse())
            {
                using (var ftpStream = ftpResponse.GetResponseStream())
                {
                    logger.Write("connecting to blob..");
                    var outputStorageAccount = CloudStorageAccount.Parse(blobConnectionString);

            var fileName = Path.GetFileName(ftpFileUrl);
                    var outputBlobUri = new Uri(outputStorageAccount.BlobEndpoint, 
                                                blobFolderPath + "/" + fileName);
                    var outputBlob = new CloudBlockBlob(outputBlobUri, outputStorageAccount.Credentials);

           logger.Write("uploading to blob URI: {0}", outputBlobUri.ToString());
                    outputBlob.UploadFromStream(ftpStream);

            logger.Write("uploading succeeded");
                }
            }
            request = null;
        }
    }
}

Enable Local Debugging and Testing

- Add a new Console Application

console-app

- Add Nuget packages same as the FTP Custom Activity Project
- Create 'ConsoleActivityLogger' class:
 using Microsoft.Azure.Management.DataFactories.Runtime;
using System;

namespace ADFCustomActivities.Test
{
    public class ConsoleActivityLogger : IActivityLogger
    {
        public void Write(string format, params object[] args)
        {
            Console.WriteLine(format, args);
        }
    }
}
- Update the code in main method:
 using System;

namespace ADFCustomActivities.Test
{
    class Program
    {
        static void Main(string[] args)
        {
            var ftpActivity = new FTPActivity();

            string ftpFileUrl = "ftp://[Your FTP Domain]/myftpdata.zip";
            string clientId = "[Your Client Id]";  //application client id
            string clientSecret = "[Your Client Secret]";  //application key
            string keyVaultSecretId = "https://ftpvault.vault.azure.net:443/secrets/FTPPassword";
            string ftpUserName = "[Your FTP Username]";
            string ftpPassword = string.Empty;
            string blobConnectionString = "[Your Storage Connectionstring]";
            string blobFolderPath = "ftpdata";

            ftpPassword = new KeyVaultHelper(clientId, clientSecret).GetSecret(keyVaultSecretId);

            ftpActivity.UploadFileToBlob(ftpFileUrl, ftpUserName, ftpPassword, blobConnectionString, 
                                         blobFolderPath, new ConsoleActivityLogger());

            Console.WriteLine("Press Any Key to Exit...");
            Console.ReadKey();
        }
    }
}
- Run the Console App to test & debug your code.

debug-view

Create Data Factory Pipeline using Visual Studio

- Add a new Empty Data Factory Project. (Add the Data Factory Tools for Visual Studio extension to see the project template)

datafactory-vs

- Add Storage Linked Service
 {
    "$schema": "https://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "FTPStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
        "connectionString": "[Your Storage Connectionstring]"
      } 
    }
}
- Add Batch Linked Service
 {
  "$schema": "https://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
  "name": "FTPBatchLinkedService",
  "properties": {
    "type": "AzureBatch",
    "typeProperties": {
      "accountName": "customftpactivity",
      "accessKey": "[Your Batch Access Key]
      "poolName": "ftpactivitypool",
      "batchUri": "https://centralus.batch.azure.com",
      "linkedServiceName": "FTPStorageLinkedService"
    }
  }
}
- Add Output Dataset (under Table)
 {
  "$schema": "https://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "FTPBlobOutput",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "FTPStorageLinkedService",
    "typeProperties": {
      "folderPath": "ftpdata"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
- Add Custom Pipeline
 {
  "$schema": "https://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "FTPActivity",
  "properties": {
    "description": "Download FTP Data and Upload to Blob",
    "activities": [
      {
        "name": "DotNetActivityTemplate",
        "type": "DotNetActivity",
        "outputs": [
          {
            "name": "FTPBlobOutput"
          }
        ],
        "typeProperties": {
          "assemblyName": "ADFCustomActivities",
          "entryPoint": "ADFCustomActivities.FTPActivity",
          "packageLinkedService": "FTPStorageLinkedService",
          "packageFile": "ftpactivitycode/ADFCustomActivities.zip",
          "extendedProperties": {
            "FTPFileUrl": "ftp://[Your FTP File domain]/myftpdata.zip"
          }
        },
        "linkedServiceName": "FTPBatchLinkedService",
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "00:05:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2016-07-01T00:00:00Z",
    "end": "2016-07-01T00:00:00Z"
  }
}
- Add the reference of FTP Custom Activity to enable automatic packaging and upload of DLL's to storage account.

add-reference

Publish and Test

- Right click on the Data Factory Project and click 'Publish'. Select the Data Factory created in the first step and continue.

publish-1 publish-2

- Open the Datasets Blade for the Data Factory.

publish-3

- Under Monitoring click on the recent updated slice. The status of the slice should be 'Ready' and the last activity run status should be 'Succeeded'

publish-4

- Verify that the FTP file has been uploaded to storage account

publish-5

Summary

Azure Data Factory supports many data stores today and it will support more in the future. If your data store is not in the list, custom activities are your best friend. With Visual Studio integration, it is very easy to create and manage custom activities as well as data factory pipelines.

Additional Resources