How to store Event Hub events to Azure SQL Database


Introduction

This sample shows how to use the EventProcessorHost to retrieve events from an Event Hub and store them in a batch mode to an Azure SQL Database. The solution demonstrates how the use the following techniques:

  • Send events to an Event Hub using both AMQP and HTTPS transport protocols.
  • Create an entity level shared access policy with only the Send claim. This key will be used to create SAS tokens, one for each publisher endpoint. 
  • Issue a SAS token to secure individual publisher endpoints.
  • Use a SAS token to authenticate at a publisher endpoint level.
  • Use the EventProcessorHost to retrieve and process events from an event hub.
  • Perform structured and sematic logging using a custom EventSource class and ETW Logs introduced by Azure SDK 2.5.
  • Use a stored procedure with a Table-Valued Parameter to store multiple events in a batch mode to a table on an Azure SQL database.

You can download the code from MSDN Code Gallery.

Scenario

This solution simulates an Internet of Things (IoT) scenario where thousands of devices send events (e.g. sensor readings) to a backend system via a message broker. The backend system retrieves events from the messaging infrastrcure and store them to a persistent repository in a scalable manner.

Architecture

The sample is structured as follows:

  • A Windows Forms application can be used to create an event hub and an entity level shared access policy with only the Send access right. 
  • The same application can be used to simulate a configurable amount of devices that send readings into the event hub. Each device uses a separate publisher endpoint to send data to the underlying event hub and a separate SAS token to authenticate with the Service Bus namespace.
  • An event hub is used to ingest device events.
  • A worker role with multiple instances uses an EventProcessorHost to read and process messages from the partitions of the event hub.
  • The custom EventProcessor class uses a Table-Valued Parameter to insert events into a table of a SQL Database in a batch mode by invoking a stored procedure.
  • The worker role uses a custom EventSourceclass and the ETW log support introduced by the Azure SDK 2.5 to write log data to table storage.
  • The stored procedure uses the MERGE statement to implement an UPSERT mechanism.

The following picture shows the architecture of the solution:

 

References

Event Hubs

Table-Valued Parameters

ETW Logs

Visual Studio Solution

The Visual Studio solution includes the following projects:
 
  • CreateIoTDbWithMerge.sql: this script can be used to create the SQL Server database used to store device events.
  • Entities: this library contains the Payload class. This class defines the structure and content of the EventData message body.
  • EventProcessorHostWorkerRole: this library defines the worker role used to handle the events from the event hub.
  • Helpers: this library defines the TraceEventSource class used by the worker role to create ETW logs at runtime.
  • Sender: this Windows Forms application can be used to create the Event Hub used by the sample and simulate a configurable amount of devices sending events to the event hub.
  • StoreEventsToAzureSqlDatabase: this Azure project defines the cloud service hosting the worker role used to handle the events from the event hub.

NOTE: To reduce the size of tha zip file, I deleted the NuGet packages. To repair the solution, make sure to right click the solution and select Enable NuGet Package Restore as shown in the picture below. For more information on this topic, see the following post.

Solution

This section briefly describes the individual components of the solution.

SQL Azure Database

Run the CreateIoTDbWithMerge.sql script to create the database used by the solution. In particular, the script create the following artifacts:

  • The EventTableType used-defined table type.
  • The Events table used to store events.
  • The sp_InsertEvents stored procedure used to store events. The stored procedure receives a single EventTableType parameter and uses the MERGE statement to implement an UPSERT mechanism. This mechanism is used to implement idempotency: if an row already exists in the table with the a given EventId, the store procedure updates its columns, otherwise a new record is created.
-- Drop sp_InsertEvents stored procedure 
IF OBJECT_ID('sp_InsertEvents') > 0 DROP PROCEDURE [dbo].[sp_InsertEvents] 
GO 
 
-- Drop Events table 
IF OBJECT_ID('Events') > 0 DROP TABLE [dbo].[Events] 
GO 
 
-- Drop EventTableType used-defined table type 
DROP TYPE [dbo].[EventTableType] 
GO 
 
-- Create EventTableType used-defined table type 
CREATE TYPE [dbo].[EventTableType] AS TABLE( 
    [EventId] [int] NOT NULL, 
    [DeviceId] [int] NULL, 
    [Value] [int] NULL, 
    [Timestamp] [datetime2](7) NULL 
) 
GO 
 
SET ANSI_NULLS ON 
GO 
 
SET QUOTED_IDENTIFIER ON 
GO 
 
-- Create Events table 
CREATE TABLE [dbo].[Events]( 
    [EventId] [int] NOT NULL, 
    [DeviceId] [int] NOT NULL, 
    [Value] [int] NOT NULL, 
    [Timestamp] [datetime2](7) NULL, 
PRIMARY KEY CLUSTERED  
( 
    [EventId] ASC 
)WITH (PAD_INDEX = OFF,  
       STATISTICS_NORECOMPUTE = OFF,  
       IGNORE_DUP_KEY = OFF,  
       ALLOW_ROW_LOCKS = ON,  
       ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] 
) ON [PRIMARY] 
GO 
 
-- Create sp_InsertEvents stored procedure 
CREATE PROCEDURE [dbo].[sp_InsertEvents]  
    @Events dbo.EventTableType READONLY 
AS  
BEGIN 
    MERGE INTO dbo.[Events] AS A 
    USING ( 
        SELECT * FROM @Events 
    ) B ON (A.EventId = B.EventId) 
    WHEN MATCHED THEN 
        UPDATE SET A.DeviceId = B.DeviceId, 
                   A.Value = B.Value, 
                   A.Timestamp = B.Timestamp 
    WHEN NOT MATCHED THEN 
        INSERT ([EventId], [DeviceId], [Value], [Timestamp])  
        VALUES(B.[EventId], B.[DeviceId], B.[Value], B.[Timestamp]); 
END 
GO

Entities

The following table contains the code of the Payload class. This class is used to define the body of the messages sent to the event hub. Note that the properties of the class are decorated with the JsonPropertyAttribute. In fact, both the client application and worker role use Json.Net to serialize and deserialize the message content in JSON.
#region Using Directives 
using System; 
using Newtonsoft.Json;  
#endregion 
 
namespace Microsoft.AzureCat.Samples.Entities 
{ 
    [Serializable] 
    public class Payload 
    { 
        /// <summary> 
        /// Gets or sets the device id. 
        /// </summary> 
        [JsonProperty(PropertyName = "eventId", Order = 1)] 
        public int EventId { get; set; } 
 
        /// <summary> 
        /// Gets or sets the device id. 
        /// </summary> 
        [JsonProperty(PropertyName = "deviceId", Order = 2)] 
        public int DeviceId { get; set; } 
 
        /// <summary> 
        /// Gets or sets the device value. 
        /// </summary> 
        [JsonProperty(PropertyName = "value", Order = 3)] 
        public int Value { get; set; } 
 
        /// <summary> 
        /// Gets or sets the event timestamp. 
        /// </summary> 
        [JsonProperty(PropertyName = "timestamp", Order = 4)] 
        public DateTime Timestamp { get; set; } 
    } 
}

Helpers

This library defines the TraceEventSource class used by the worker role to trace events to ETW logs. The WAD agent running on the worker role instances will read data out of local ETW logs and persist this data to a couple of storage tables (WASDiagnosticTable and WADEventProcessorTable) in the storage account configured for Windows Azure Diagnostics.
#region Using Directives
using System;
using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;
using Microsoft.WindowsAzure.ServiceRuntime;

#endregion

namespace Microsoft.AzureCat.Samples.Helpers
{
    [EventSource(Name = "TraceEventSource")]
    public sealed class TraceEventSource : EventSource
    {
        #region Internal Enums
        public class Keywords
        {
            public const EventKeywords EventHub = (EventKeywords)1;
            public const EventKeywords DataBase = (EventKeywords)2;
            public const EventKeywords Diagnostic = (EventKeywords)4;
            public const EventKeywords Performance = (EventKeywords)8;
        }
        #endregion

        #region Public Static Properties
        public static readonly TraceEventSource Log = new TraceEventSource(); 
        #endregion

        #region Private Methods
        [Event(1,
       Message = "TraceIn",
       Keywords = Keywords.Diagnostic,
       Level = EventLevel.Verbose)]
        private void TraceIn(string application,
                             string instance,
                             Guid activityId,
                             string description,
                             string source,
                             string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance))
            {
                return;
            }
            WriteEvent(1, application, instance, activityId, description, source, method);
        }

        [Event(2,
               Message = "TraceOut",
               Keywords = Keywords.Diagnostic,
               Level = EventLevel.Verbose)]
        private void TraceOut(string application,
                              string instance,
                              Guid activityId,
                              string description,
                              string source,
                              string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance))
            {
                return;
            }
            WriteEvent(2, application, instance, activityId, description, source, method);
        }

        [Event(3,
               Message = "TraceApi",
               Keywords = Keywords.Diagnostic,
               Level = EventLevel.Informational)]
        private void TraceExec(string application,
                               string instance,
                               Guid activityId,
                               double elapsed,
                               string description,
                               string source,
                               string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(description))
            {
                return;
            }
            WriteEvent(3, application, instance, activityId, elapsed, description, source, method);
        }

        [Event(4,
               Message = "TraceInfo",
               Keywords = Keywords.Diagnostic,
               Level = EventLevel.Informational)]
        private void TraceInfo(string application,
                               string instance,
                               string description,
                               string source,
                               string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(description))
            {
                return;
            }
            WriteEvent(4, application, instance, description, source, method);
        }

        [Event(5,
               Message = "TraceError",
               Keywords = Keywords.Diagnostic,
               Level = EventLevel.Error)]
        private void TraceError(string application,
                                string instance,
                                Guid activityId,
                                string exception,
                                string innerException,
                                string source,
                                string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(exception))
            {
                return;
            }
            WriteEvent(5, application, instance, activityId, exception, string.IsNullOrWhiteSpace(innerException) ? string.Empty : innerException, source, method);
        }

        [Event(6,
               Message = "OpenPartition",
               Keywords = Keywords.EventHub,
               Level = EventLevel.Informational)]
        private void OpenPartition(string application, 
                                   string instance, 
                                   string eventHub, 
                                   string consumerGroup, 
                                   string partitionId, 
                                   string source, 
                                   string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            WriteEvent(6, application, instance, eventHub, consumerGroup, partitionId, source, method);
        }

        [Event(7,
               Message = "ClosePartition",
               Keywords = Keywords.EventHub,
               Level = EventLevel.Informational)]
        private void ClosePartition(string application, 
                                    string instance, 
                                    string eventHub, 
                                    string consumerGroup, 
                                    string partitionId, 
                                    string reason, 
                                    string source, 
                                    string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            WriteEvent(7, application, instance, eventHub, consumerGroup, partitionId, reason, source, method);
        }

        [Event(8,
               Message = "ProcessEvents",
               Keywords = Keywords.EventHub,
               Level = EventLevel.Informational)]
        private void ProcessEvents(string application, 
                                   string instance, 
                                   string eventHub, 
                                   string consumerGroup, 
                                   string partitionId, 
                                   int messageCount, 
                                   string source, 
                                   string method)
        {
            if (string.IsNullOrWhiteSpace(application) ||
                string.IsNullOrWhiteSpace(instance) ||
                string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            if (IsEnabled())
            {
                WriteEvent(8, application, instance, eventHub, consumerGroup, partitionId, messageCount, source, method);
            }
        } 
        #endregion

        #region Public Methods
        [NonEvent]
        public void TraceApi(TimeSpan elapsed,
                             string description,
                             [CallerFilePath] string source = "",
                             [CallerMemberName] string method = "")
        {
            if (IsEnabled())
            {
                TraceExec(RoleEnvironment.CurrentRoleInstance.Role.Name,
                          RoleEnvironment.CurrentRoleInstance.Id,
                          Guid.NewGuid(),
                          elapsed.TotalMilliseconds,
                          description,
                          source,
                          method);
            }
        }

        [NonEvent]
        public void TraceApi(Guid activityId,
                             TimeSpan elapsed,
                             string description,
                             [CallerFilePath] string source = "",
                             [CallerMemberName] string method = "")
        {
            if (IsEnabled())
            {
                TraceExec(RoleEnvironment.CurrentRoleInstance.Role.Name,
                          RoleEnvironment.CurrentRoleInstance.Id,
                          activityId,
                          elapsed.TotalMilliseconds,
                          description,
                          source,
                          method);
            }
        }

        [NonEvent]
        public void TraceIn([CallerFilePath] string source = "",
                            [CallerMemberName] string method = "")
        {
            if (IsEnabled())
            {
                TraceIn(RoleEnvironment.CurrentRoleInstance.Role.Name,
                        RoleEnvironment.CurrentRoleInstance.Id,
                        Guid.NewGuid(),
                        string.Empty,
                        source,
                        method);
            }
        }

        [NonEvent]
        public void TraceOut([CallerFilePath] string source = "",
                             [CallerMemberName] string method = "")
        {
            if (IsEnabled())
            {
                TraceOut(RoleEnvironment.CurrentRoleInstance.Role.Name,
                         RoleEnvironment.CurrentRoleInstance.Id,
                         Guid.NewGuid(),
                         string.Empty,
                         source,
                         method);
            }
        }

        [NonEvent]
        public void TraceInfo(string description, 
                              [CallerFilePath] string source = "", 
                              [CallerMemberName] string method = "")
        {
            if (IsEnabled())
            {
                TraceInfo(RoleEnvironment.CurrentRoleInstance.Role.Name,
                          RoleEnvironment.CurrentRoleInstance.Id,
                          description,
                          source,
                          method);
            }
        }

        [NonEvent]
        public void TraceError(string exception, 
                               string innerException,
                               [CallerFilePath] string source = "", 
                               [CallerMemberName] string method = "")
        {
            if (string.IsNullOrWhiteSpace(exception))
            {
                return;
            }
            if (IsEnabled())
            {
                TraceError(RoleEnvironment.CurrentRoleInstance.Role.Name,
                           RoleEnvironment.CurrentRoleInstance.Id,
                           Guid.NewGuid(),
                           exception, 
                           string.IsNullOrWhiteSpace(innerException) ? string.Empty : innerException,
                           source,
                           method);
            }
        }

       [NonEvent]
        public void OpenPartition(string eventHub, 
                                  string consumerGroup, 
                                  string partitionId,
                                  [CallerFilePath] string source = "",
                                  [CallerMemberName] string method = "")
        {
            if (string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            if (IsEnabled())
            {
                OpenPartition(RoleEnvironment.CurrentRoleInstance.Role.Name,
                              RoleEnvironment.CurrentRoleInstance.Id,
                              eventHub,
                              consumerGroup,
                              partitionId,
                              source,
                              method);
            }
        }

        [NonEvent]
       public void ClosePartition(string eventHub, 
                                  string consumerGroup, 
                                  string partitionId,
                                  string reason,
                                  [CallerFilePath] string source = "",
                                  [CallerMemberName] string method = "")
        {
            if (string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            if (IsEnabled())
            {
                ClosePartition(RoleEnvironment.CurrentRoleInstance.Role.Name,
                               RoleEnvironment.CurrentRoleInstance.Id,
                               eventHub,
                               consumerGroup,
                               partitionId,
                               reason,
                               source,
                               method);
            }
        }

        [NonEvent]
        public void ProcessEvents(string eventHub, 
                                  string consumerGroup, 
                                  string partitionId, 
                                  int messageCount,
                                  [CallerFilePath] string source = "",
                                  [CallerMemberName] string method = "")
        {
            if (string.IsNullOrWhiteSpace(eventHub) ||
                string.IsNullOrWhiteSpace(consumerGroup) ||
                string.IsNullOrWhiteSpace(partitionId))
            {
                return;
            }
            if (IsEnabled())
            {
                ProcessEvents(RoleEnvironment.CurrentRoleInstance.Role.Name,
                              RoleEnvironment.CurrentRoleInstance.Id,
                              eventHub,
                              consumerGroup,
                              partitionId,
                              messageCount,
                              source,
                              method);
            }
        }

        #endregion
    }
}
EventProcessorHostWorkerRole
The following table contains the code of the WorkerRole class. The class reads the following settings from the service configuration file and then create an instance of the EventProcessorHost class:

 

  • Connectionstring of the Azure SQL Database where to store events.
  • Connectionstring of the Service Bus namespace hsoting the event hub used by the solution.
  • Connectionstring of the storage account used by the EventProcessorHost.
  • The name of the event hub.
  • The name of the consumer group used by the worker role to read events from the event hub.
#region Using Directives
using System;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.AzureCat.Samples.Helpers;
#endregion

namespace Microsoft.AzureCat.Samples.EventProcessorHostWorkerRole
{
    public class WorkerRole : RoleEntryPoint
    {
        #region Private Constants
        //*******************************
        // Messages & Formats
        //*******************************
        private const string RoleEnvironmentSettingFormat = "Configuration Setting [{0}] = [{1}].";
        private const string RoleEnvironmentConfigurationSettingChangedFormat = "The setting [{0}] is changed: new value = [{1}].";
        private const string RoleEnvironmentConfigurationSettingChangingFormat = "The setting [{0}] is changing: old value = [{1}].";
        private const string RoleEnvironmentTopologyChangedFormat = "The  topology for the [{0}] role is changed.";
        private const string RoleEnvironmentTopologyChangingFormat = "The  topology for the [{0}] role is changing.";
        private const string RoleInstanceCountFormat = "[Role {0}] instance count = [{1}].";
        private const string RoleInstanceEndpointCountFormat = "[Role {0}] instance endpoints count = [{1}].";
        private const string RoleInstanceEndpointFormat = "[Role {0}] instance endpoint [{1}]: protocol = [{2}] address = [{3}] port = [{4}].";
        private const string RoleInstanceIdFormat = "[Role {0}] instance Id = [{1}].";
        private const string RoleInstanceStatusFormat = "[Role {0}] instance Id = [{1}] Status = [{2}].";
        private const string Unknown = "Unknown";
        private const string RegisteringEventProcessor = "Registering Event Processor [EventProcessor]... ";
        private const string EventProcessorRegistered = "Event Processor [EventProcessor] successfully registered. ";

        //*******************************
        // Settings
        //*******************************
        private const string SqlDatabaseConnectionStringSetting = "SqlDatabaseConnectionString";
        private const string StorageAccountConnectionStringSetting = "StorageAccountConnectionString";
        private const string ServiceBusConnectionStringSetting = "ServiceBusConnectionString";
        private const string EventHubNameSetting = "EventHubName";
        private const string ConsumerGroupNameSetting = "ConsumerGroupName";
        #endregion

        #region Private Fields
        private string eventHubName;
        private string consumerGroupName;
        private string sqlDatabaseConnectionString;
        private string storageAccountConnectionString;
        private string serviceBusConnectionString;
        private EventProcessorHost eventProcessorHost;
        #endregion

        #region Public Methods
        public override void Run()
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();
                
                while (true)
                {
                    Thread.Sleep(10000);
                }
                // ReSharper disable once FunctionNeverReturns
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message, 
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        public override bool OnStart()
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Set Default values for the ServicePointManager
                ServicePointManager.DefaultConnectionLimit = Environment.ProcessorCount * 120;
                ServicePointManager.UseNagleAlgorithm = false;
                ServicePointManager.Expect100Continue = false;

                // Setting RoleEnvironment event handlers
                RoleEnvironment.Changed += RoleEnvironment_Changed;
                RoleEnvironment.Changing += RoleEnvironment_Changing;
                RoleEnvironment.StatusCheck += RoleEnvironment_StatusCheck;
                RoleEnvironment.Stopping += RoleEnvironment_Stopping;

                // Read Configuration Settings
                ReadConfigurationSettings();

                // Start Event Processor
                StartEventProcessorAsync().Wait();

                // Run base.OnStart method
                return base.OnStart();
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
                return false;
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }
        #endregion

        #region Private Methods

        private void ReadConfigurationSettings()
        {
            // Read sql database connectionstring setting  
            sqlDatabaseConnectionString = CloudConfigurationHelper.GetSetting(SqlDatabaseConnectionStringSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                        SqlDatabaseConnectionStringSetting,
                                                        sqlDatabaseConnectionString));

            // Read storage account connectionstring setting  
            storageAccountConnectionString = CloudConfigurationHelper.GetSetting(StorageAccountConnectionStringSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                        StorageAccountConnectionStringSetting,
                                                        storageAccountConnectionString));

            // Read service bus connectionstring setting  
            serviceBusConnectionString = CloudConfigurationHelper.GetSetting(ServiceBusConnectionStringSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                        ServiceBusConnectionStringSetting,
                                                        serviceBusConnectionString));
            
            // Read event hub name setting
            eventHubName = CloudConfigurationHelper.GetSetting(EventHubNameSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                        ServiceBusConnectionStringSetting,
                                                        serviceBusConnectionString));

            // Read event consumer group name setting
            consumerGroupName = CloudConfigurationHelper.GetSetting(ConsumerGroupNameSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                        ServiceBusConnectionStringSetting,
                                                        serviceBusConnectionString));
        }

        private async Task StartEventProcessorAsync()
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();
                var eventHubClient = EventHubClient.CreateFromConnectionString(serviceBusConnectionString, eventHubName);

                // Get the default Consumer Group
                eventProcessorHost = new EventProcessorHost(RoleEnvironment.CurrentRoleInstance.Id,
                                                            eventHubClient.Path.ToLower(),
                                                            consumerGroupName.ToLower(),
                                                            serviceBusConnectionString,
                                                            storageAccountConnectionString)
                {
                    PartitionManagerOptions = new PartitionManagerOptions
                    {
                        AcquireInterval = TimeSpan.FromSeconds(10), // Default is 10 seconds
                        RenewInterval = TimeSpan.FromSeconds(10), // Default is 10 seconds
                        LeaseInterval = TimeSpan.FromSeconds(30) // Default value is 30 seconds
                    }
                };
                TraceEventSource.Log.TraceInfo(RegisteringEventProcessor);
                var eventProcessorOptions = new EventProcessorOptions
                {
                    InvokeProcessorAfterReceiveTimeout = true,
                    MaxBatchSize = 100,
                    PrefetchCount = 100,
                    ReceiveTimeOut = TimeSpan.FromSeconds(30),
                };
                eventProcessorOptions.ExceptionReceived += eventProcessorOptions_ExceptionReceived;
                await eventProcessorHost.RegisterEventProcessorFactoryAsync(new EventProcessorFactory<EventProcessor>(sqlDatabaseConnectionString),
                                                                            eventProcessorOptions);
                TraceEventSource.Log.TraceInfo(EventProcessorRegistered);
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        void eventProcessorOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
        {
            if (e == null || e.Exception == null)
            {
                return;
            }

            // Trace Exception
            TraceEventSource.Log.TraceError(e.Exception.Message,
                                            e.Exception.InnerException != null ?
                                            e.Exception.InnerException.Message :
                                            string.Empty);
        }

        /// <summary>
        /// Occurs after a change to the service configuration is applied to the running instances of a role.
        /// </summary>
        /// <param name="sender">The source of the event.</param>
        /// <param name="e">Represents the arguments for the Changed event, which occurs after a configuration change has been applied to a role instance.</param>
        private static void RoleEnvironment_Changed(object sender, RoleEnvironmentChangedEventArgs e)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Get the list of configuration setting changes
                var settingChanges = e.Changes.OfType<RoleEnvironmentConfigurationSettingChange>();

                foreach (var settingChange in settingChanges)
                {
                    var value = CloudConfigurationHelper.GetSetting(settingChange.ConfigurationSettingName);
                    TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentConfigurationSettingChangedFormat,
                                                                            settingChange.ConfigurationSettingName ?? string.Empty,
                                                                            string.IsNullOrEmpty(value) ? string.Empty : value));
                }

                // Get the list of configuration changes
                var topologyChanges = e.Changes.OfType<RoleEnvironmentTopologyChange>();

                foreach (var roleName in topologyChanges.Select(topologyChange => topologyChange.RoleName))
                {
                    TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentTopologyChangedFormat,string.IsNullOrEmpty(roleName) ? Unknown : roleName));
                    if (string.IsNullOrEmpty(roleName))
                    {
                        continue;
                    }
                    var role = RoleEnvironment.Roles[roleName];
                    if (role == null)
                    {
                        continue;
                    }
                    TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceCountFormat, roleName, role.Instances.Count));
                    foreach (var roleInstance in role.Instances)
                    {
                        TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceIdFormat, roleName, roleInstance.Id));
                        TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceEndpointCountFormat, roleName, roleInstance.InstanceEndpoints.Count));
                        foreach (var instanceEndpoint in roleInstance.InstanceEndpoints)
                        {
                            TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceEndpointFormat,
                                                                            roleName,
                                                                            instanceEndpoint.Key,
                                                                            instanceEndpoint.Value.Protocol,
                                                                            instanceEndpoint.Value.IPEndpoint.Address,
                                                                            instanceEndpoint.Value.IPEndpoint.Port));
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        /// <summary>
        /// Occurs before a change to the service configuration is applied to the running instances of a role. 
        /// </summary>
        /// <param name="sender">The source of the event.</param>
        /// <param name="e">presents the arguments for the Changing event, which occurs before a configuration change is applied to a role instance. </param>
        private static void RoleEnvironment_Changing(object sender, RoleEnvironmentChangingEventArgs e)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Get the list of configuration setting changes
                var settingChanges = e.Changes.OfType<RoleEnvironmentConfigurationSettingChange>();

                foreach (var settingChange in settingChanges)
                {
                    var value = CloudConfigurationHelper.GetSetting(settingChange.ConfigurationSettingName);
                    TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentConfigurationSettingChangingFormat,
                                                                    settingChange.ConfigurationSettingName,
                                                                    string.IsNullOrEmpty(value) ? string.Empty : value));
                }

                // Get the list of configuration changes
                var topologyChanges = e.Changes.OfType<RoleEnvironmentTopologyChange>();

                foreach (var roleName in topologyChanges.Select(topologyChange => topologyChange.RoleName))
                {
                    TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentTopologyChangingFormat,
                                                                    string.IsNullOrEmpty(roleName) ? Unknown : roleName));
                    if (string.IsNullOrEmpty(roleName))
                    {
                        continue;
                    }
                    var role = RoleEnvironment.Roles[roleName];
                    if (role == null)
                    {
                        continue;
                    }
                    TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceCountFormat, roleName, role.Instances.Count));
                    foreach (var roleInstance in role.Instances)
                    {
                        TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceIdFormat, roleName, roleInstance.Id));
                        TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceEndpointCountFormat, roleName, roleInstance.InstanceEndpoints.Count));
                        foreach (var instanceEndpoint in roleInstance.InstanceEndpoints)
                        {
                            TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceEndpointFormat,
                                                                            roleName,
                                                                            instanceEndpoint.Key,
                                                                            instanceEndpoint.Value.Protocol,
                                                                            instanceEndpoint.Value.IPEndpoint.Address,
                                                                            instanceEndpoint.Value.IPEndpoint.Port));
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        /// <summary>
        /// Occurs at a regular interval to indicate the status of a role instance. 
        /// </summary>
        /// <param name="sender">The source of the event.</param>
        /// <param name="e">Represents the arguments for the StatusCheck event, which occurs at a regular interval to indicate the status of a role instance.</param>
        private static void RoleEnvironment_StatusCheck(object sender, RoleInstanceStatusCheckEventArgs e)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Write Role Instance Status
                TraceEventSource.Log.TraceInfo(string.Format(RoleInstanceStatusFormat,
                                                                RoleEnvironment.CurrentRoleInstance.Role.Name,
                                                                RoleEnvironment.CurrentRoleInstance.Id,
                                                                e.Status));
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        /// <summary>
        /// Occurs when a role instance is about to be stopped. 
        /// </summary>
        /// <param name="sender">The source of the event.</param>
        /// <param name="e">Represents the arguments for the Stopping event, which occurs when a role instance is being stopped. </param>
        private static void RoleEnvironment_Stopping(object sender, RoleEnvironmentStoppingEventArgs e)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message,
                                                   ex.InnerException != null ?
                                                   ex.InnerException.Message :
                                                   string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }
        #endregion
    }
}
The following table contains the code of the EventProcessor class. In particular, the ProcessEventsAsync method writes events to the Azure SQL Database in a batch mode by invoking the sp_InsertEvents stored procedure with a DataTable object as input parameter.
#region Using Directives
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
using Microsoft.AzureCat.Samples.Entities;
using Microsoft.AzureCat.Samples.Helpers;
#endregion

namespace Microsoft.AzureCat.Samples.EventProcessorHostWorkerRole
{
    public class EventProcessor : IEventProcessor
    {
        #region Private Constants
        //*******************************
        // Messages & Formats
        //*******************************
        private const string RoleEnvironmentSettingFormat = "Configuration Setting [{0}] = [{1}].";

        //*******************************
        // Settings
        //*******************************
        private const string SqlDatabaseConnectionStringSetting = "SqlDatabaseConnectionString";

        //*******************************
        // Columns & Commands & Parameters
        //*******************************
        private const string EventId = "EventId";
        private const string DeviceId = "DeviceId";
        private const string Value = "Value";
        private const string Timestamp = "Timestamp";
        private const string InsertEventsStoreProcedure = "sp_InsertEvents";
        private const string EventsParameter = "@Events";
        private const string TableType = "EventTableType";
        #endregion

        #region Private Fields
        private readonly string sqlDatabaseConnectionString;
        #endregion

        #region Public Constructors

        public EventProcessor()
        {
            // Read sql database connectionstring setting  
            sqlDatabaseConnectionString = CloudConfigurationHelper.GetSetting(SqlDatabaseConnectionStringSetting);
            TraceEventSource.Log.TraceInfo(string.Format(RoleEnvironmentSettingFormat,
                                                         SqlDatabaseConnectionStringSetting,
                                                         sqlDatabaseConnectionString));
        }

        public EventProcessor(string sqlDatabaseConnectionString)
        {
            this.sqlDatabaseConnectionString = sqlDatabaseConnectionString;
        }
        #endregion

        #region IEventProcessor Methods
        public Task OpenAsync(PartitionContext context)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Trace Open Partition
                TraceEventSource.Log.OpenPartition(context.EventHubPath,
                                                   context.ConsumerGroupName,
                                                   context.Lease.PartitionId);
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message, 
                                                ex.InnerException != null ?
                                                ex.InnerException.Message :
                                                string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
            return Task.FromResult<object>(null);
        }

        public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                if (events == null)
                {
                    return;
                }
                var eventDataList = events as IList<EventData> ?? events.ToList();

                // Trace Process Events
                TraceEventSource.Log.ProcessEvents(context.EventHubPath,
                                                   context.ConsumerGroupName,
                                                   context.Lease.PartitionId,
                                                   eventDataList.Count);

                using (var sqlConnection = new SqlConnection(sqlDatabaseConnectionString))
                {
                    await sqlConnection.OpenAsync();

                    var table = new DataTable();

                    // Add columns to the table
                    table.Columns.Add(EventId, typeof(int));
                    table.Columns.Add(DeviceId, typeof(int));
                    table.Columns.Add(Value, typeof(int));
                    table.Columns.Add(Timestamp, typeof(DateTime));

                    // Add rows to the table
                    foreach (var eventData in eventDataList)
                    {
                        // Note: EventData is disposable
                        using (eventData)
                        {
                            var data = DeserializeEventData(eventData);
                            table.Rows.Add(data.EventId, data.DeviceId, data.Value, data.Timestamp);
                        }
                    }

                    // Create command
                    var sqlCommand = new SqlCommand(InsertEventsStoreProcedure, sqlConnection)
                    {
                        CommandType = CommandType.StoredProcedure
                    };

                    // Add table-valued parameter
                    sqlCommand.Parameters.Add(new SqlParameter
                        {
                            ParameterName = EventsParameter,
                            SqlDbType = SqlDbType.Structured,
                            TypeName = TableType,
                            Value = table,
                        });

                    // Execute the query
                    await sqlCommand.ExecuteNonQueryAsync();
                }
                await context.CheckpointAsync();
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message, 
                                                ex.InnerException != null ?
                                                ex.InnerException.Message :
                                                string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }

        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            try
            {
                // TraceIn
                TraceEventSource.Log.TraceIn();

                // Trace Open Partition
                TraceEventSource.Log.ClosePartition(context.EventHubPath,
                                                    context.ConsumerGroupName,
                                                    context.Lease.PartitionId,
                                                    reason.ToString());

                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
            catch (Exception ex)
            {
                // Trace Exception
                TraceEventSource.Log.TraceError(ex.Message, 
                                                ex.InnerException != null ?
                                                ex.InnerException.Message :
                                                string.Empty);
            }
            finally
            {
                // TraceOut
                TraceEventSource.Log.TraceOut();
            }
        }
        #endregion

        #region Private Static Methods
        private static Payload DeserializeEventData(EventData eventData)
        {
            return JsonConvert.DeserializeObject<Payload>(Encoding.UTF8.GetString(eventData.GetBytes()));
        }
        #endregion
    }
}

StoreEventsToAzureSqlDatabase

This project defines the cloud service hosting the worker role. The following table contains the service definition file of the cloud service.
<?xml version="1.0" encoding="utf-8"?>
<ServiceDefinition name="StoreEventsToAzureSqlDatabase" 
                   xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition" 
                   schemaVersion="2014-06.2.4">
  <WorkerRole name="EventProcessorHostWorkerRole" vmsize="Small">
    <ConfigurationSettings>
      <Setting name="SqlDatabaseConnectionString" />
      <Setting name="StorageAccountConnectionString" />
      <Setting name="ServiceBusConnectionString" />
      <Setting name="EventHubName" />
      <Setting name="ConsumerGroupName" />
    </ConfigurationSettings>
    <Imports>
      <Import moduleName="RemoteAccess" />
      <Import moduleName="RemoteForwarder" />
    </Imports>
  </WorkerRole>
</ServiceDefinition>

The following table contains the service definition file of the cloud service. Make sure to substitute the placeholders with the expected information before deploying the could service to Azure.
<?xml version="1.0" encoding="utf-8"?>
<ServiceConfiguration serviceName="StoreEventsToAzureSqlDatabase" 
                      xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration" 
                      osFamily="4" 
                      osVersion="*" 
                      schemaVersion="2014-06.2.4">
  <Role name="EventProcessorHostWorkerRole">
    <Instances count="2" />
    <ConfigurationSettings>
      <Setting name="SqlDatabaseConnectionString" value="[AZURE SQL DATABASE CONNECTION STRING]" />
      <Setting name="StorageAccountConnectionString" value="[STORAGE ACCOUNT CONNECTION STRING]" />
      <Setting name="ServiceBusConnectionString" value="[SERVICE BUS CONNECTION STRING]" />
      <Setting name="EventHubName" value="[EVENT HUB NAME]" />
      <Setting name="ConsumerGroupName" value="[CONSUMER GROUP NAME]" />
      <Setting name="Microsoft.WindowsAzure.Plugins.RemoteAccess.Enabled" value="true" />
      <Setting name="Microsoft.WindowsAzure.Plugins.RemoteAccess.AccountUsername" value="..." />
      <Setting name="Microsoft.WindowsAzure.Plugins.RemoteAccess.AccountEncryptedPassword" value="..." />
      <Setting name="Microsoft.WindowsAzure.Plugins.RemoteAccess.AccountExpiration" value="..." />
      <Setting name="Microsoft.WindowsAzure.Plugins.RemoteForwarder.Enabled" value="true" />
    </ConfigurationSettings>
    <Certificates>
      <Certificate name="Microsoft.WindowsAzure.Plugins.RemoteAccess.PasswordEncryption" 
                   thumbprint="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" 
                   thumbprintAlgorithm="sha1" />
    </Certificates>
  </Role>
</ServiceConfiguration>

To configure diagnostics configuration and in particular ETW Logs for the worker role, you can proceed as follows: right click on the role and selec Properties. On the Configuration property page make sure Enable Diagnostics is checked and click the Configure… button. On the Diagnostics configuration dialog go to the ETW Logs tab and select Enable transfer of ETW logs. Add the appropriate event sources to transfer by specifying the event source name and clicking on the Add Event Source button. In the sample, make sure to select the TraceEventSource as shown in the picture below.

Then, right click the new row and select Configure event storage... menu item. In the Storage Configuration dialog, specify the name of the default storage table and, optionally, specify the name of the target table for each EventId defined in the EventSource class. In this sample, all diagnostic messages generated by the TraceEventSource class are configured to be traced to the WADDiagnosticTable (the prefix WAD is automatically added by Windows Azure Diagnostics), while the logs generated by the EventProcessor class are stored in a separate table called WADEventProcessorTable.

Once added, configure additional properties like the storage location for the logs, the log level, any keyword filters and transfer frequency.

Sender

This application can be used to provision the event hub used by the application and simulate a configurable amount of devices.

The following table shows the configuration file of the application. Make sure to substitute the placeholders with the expected information before running the application.

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <appSettings>
    <add key="namespace" value="[SERVICE BUS NAMESPACE]"/>
    <add key="keyName" value="[NAMESPACE LEVEL SAS KEY NAME]"/>
    <add key="keyValue" value="[NAMESPACE LEVEL SAS KEY VALUE]"/>
    <add key="eventHub" value="[EVENT HUB NAME]"/>
    <add key="partitionCount" value="16"/>
    <add key="retentionDays" value="7"/>
    <add key="location" value="Milan"/>
    <add key="deviceCount" value="10"/>
    <add key="eventInterval" value="1"/>
    <add key="minValue" value="20"/>
    <add key="maxValue" value="50"/>
  </appSettings>
  <system.serviceModel>
    <extensions>
      <!-- In this extension section we are introducing all known service bus extensions. User can remove the ones they don't need. -->
      <behaviorExtensions>
        <add name="connectionStatusBehavior"
          type="Microsoft.ServiceBus.Configuration.ConnectionStatusElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="transportClientEndpointBehavior"
          type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="serviceRegistrySettings"
          type="Microsoft.ServiceBus.Configuration.ServiceRegistrySettingsElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </behaviorExtensions>
      <bindingElementExtensions>
        <add name="netMessagingTransport"
          type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus,  Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="tcpRelayTransport"
          type="Microsoft.ServiceBus.Configuration.TcpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpRelayTransport"
          type="Microsoft.ServiceBus.Configuration.HttpRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="httpsRelayTransport"
          type="Microsoft.ServiceBus.Configuration.HttpsRelayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="onewayRelayTransport"
          type="Microsoft.ServiceBus.Configuration.RelayedOnewayTransportElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingElementExtensions>
      <bindingExtensions>
        <add name="basicHttpRelayBinding"
          type="Microsoft.ServiceBus.Configuration.BasicHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="webHttpRelayBinding"
          type="Microsoft.ServiceBus.Configuration.WebHttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="ws2007HttpRelayBinding"
          type="Microsoft.ServiceBus.Configuration.WS2007HttpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netTcpRelayBinding"
          type="Microsoft.ServiceBus.Configuration.NetTcpRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netOnewayRelayBinding"
          type="Microsoft.ServiceBus.Configuration.NetOnewayRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netEventRelayBinding"
          type="Microsoft.ServiceBus.Configuration.NetEventRelayBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
        <add name="netMessagingBinding"
          type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingExtensions>
    </extensions>
  </system.serviceModel>
</configuration>
The following table contains the code of the MainForm class. Spend a few minutes to analyze the code of the btnStart_Click method. This method check creates the event hub if it doesn't exist and creates the SendKey used to create SAS tokens for individual devices. Then the code creates a separate Task for each device. Each Task start sending events using the selected transport (AMQP or HTTPS).
#region Using Directives
using System;
using System.Configuration;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Globalization;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Web;
using System.Windows.Forms;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.AzureCat.Samples.Entities;
#endregion

namespace Microsoft.AzureCat.Samples.Sender
{
    public partial class MainForm : Form
    {
        #region Private Constants
        //***************************
        // Formats
        //***************************
        private const string DateFormat = "<{0,2:00}:{1,2:00}:{2,2:00}> {3}";
        private const string ExceptionFormat = "Exception: {0}";
        private const string InnerExceptionFormat = "InnerException: {0}";
        private const string LogFileNameFormat = "WADTablesCleaner {0}.txt";

        //***************************
        // Constants
        //***************************
        private const string SaveAsTitle = "Save Log As";
        private const string SaveAsExtension = "txt";
        private const string SaveAsFilter = "Text Documents (*.txt)|*.txt";
        private const string Start = "Start";
        private const string Stop = "Stop";
        private const string SenderSharedAccessKey = "SenderSharedAccessKey";
        private const string DeviceId = "id";
        private const string DeviceName = "name";
        private const string DeviceLocation = "location";
        private const string Value = "value";

        //***************************
        // Configuration Parameters
        //***************************
        private const string NamespaceParameter = "namespace";
        private const string KeyNameParameter = "keyName";
        private const string KeyValueParameter = "keyValue";
        private const string EventHubParameter = "eventHub";
        private const string LocationParameter = "location";
        private const string PartitionCountParameter = "partitionCount";
        private const string RetentionDaysParameter = "retentionDays";
        private const string DeviceCountParameter = "deviceCount";
        private const string EventIntervalParameter = "eventInterval";
        private const string MinValueParameter = "minValue";
        private const string MaxValueParameter = "maxValue";
        private const string ApiVersion = "&api-version=2014-05";

        //***************************
        // Configuration Parameters
        //***************************
        private const string DefaultEventHubName = "SampleEventHub";
        private const int DefaultDeviceNumber = 10;
        private const int DefaultMinValue = 20;
        private const int DefaultMaxValue = 50;
        private const int DefaultEventIntervalInSeconds = 1;


        //***************************
        // Messages
        //***************************
        private const string NamespaceCannonBeNull = "The Service Bus namespace cannot be null.";
        private const string EventHubNameCannonBeNull = "The event hub name cannot be null.";
        private const string KeyNameCannonBeNull = "The senderKey name cannot be null.";
        private const string KeyValueCannonBeNull = "The senderKey value cannot be null.";
        private const string EventHubCreatedOrRetrieved = "Event hub [{0}] created or retrieved.";
        private const string MessagingFactoryCreated = "Device[{0,3:000}]. MessagingFactory created.";
        private const string SasToken = "Device[{0,3:000}]. SAS Token created.";
        private const string EventHubClientCreated = "Device[{0,3:000}]. EventHubClient created: Path=[{1}].";
        private const string HttpClientCreated = "Device[{0,3:000}]. HttpClient created: BaseAddress=[{1}].";
        private const string EventSent = "Device[{0,3:000}]. Message sent. PartitionKey=[{1}] Value=[{2}]";
        private const string SendFailed = "Device[{0,3:000}]. Message send failed: [{1}]";
        #endregion

        #region Private Fields
        private CancellationTokenSource cancellationTokenSource;
        private int eventId;
        #endregion

        #region Public Constructor
        /// <summary>
        /// Initializes a new instance of the MainForm class.
        /// </summary>
        public MainForm()
        {
            InitializeComponent();
            ConfigureComponent();
            ReadConfiguration();
        }
        #endregion

        #region Public Methods

        public void ConfigureComponent()
        {
            txtNamespace.AutoSize = false;
            txtNamespace.Size = new Size(txtNamespace.Size.Width, 24);
            txtKeyName.AutoSize = false;
            txtKeyName.Size = new Size(txtKeyName.Size.Width, 24);
            txtKeyValue.AutoSize = false;
            txtKeyValue.Size = new Size(txtKeyValue.Size.Width, 24);
            txtEventHub.AutoSize = false;
            txtEventHub.Size = new Size(txtEventHub.Size.Width, 24);
        }

        public void HandleException(Exception ex)
        {
            if (ex == null || string.IsNullOrEmpty(ex.Message))
            {
                return;
            }
            WriteToLog(string.Format(CultureInfo.CurrentCulture, ExceptionFormat, ex.Message));
            if (ex.InnerException != null && !string.IsNullOrEmpty(ex.InnerException.Message))
            {
                WriteToLog(string.Format(CultureInfo.CurrentCulture, InnerExceptionFormat, ex.InnerException.Message));
            }
        }
        #endregion

        #region Private Methods
        public static bool IsJson(string item)
        {
            if (item == null)
            {
                throw new ArgumentException("The item argument cannot be null.");
            }
            try
            {
                var obj = JToken.Parse(item);
                return obj != null;
            }
            catch (Exception)
            {
                return false;
            }
        }

        public string IndentJson(string json)
        {
            if (string.IsNullOrWhiteSpace(json))
            {
                return null;
            }
            dynamic parsedJson = JsonConvert.DeserializeObject(json);
            return JsonConvert.SerializeObject(parsedJson, Formatting.Indented);
        }

        private void ReadConfiguration()
        {
            try
            {
                txtNamespace.Text = ConfigurationManager.AppSettings[NamespaceParameter];
                txtKeyName.Text = ConfigurationManager.AppSettings[KeyNameParameter];
                txtKeyValue.Text = ConfigurationManager.AppSettings[KeyValueParameter];
                txtEventHub.Text = ConfigurationManager.AppSettings[EventHubParameter] ?? DefaultEventHubName;
                var eventHubDescription = new EventHubDescription(txtEventHub.Text);
                int value;
                var setting = ConfigurationManager.AppSettings[PartitionCountParameter];
                txtPartitionCount.Text = int.TryParse(setting, out value) ?
                                       value.ToString(CultureInfo.InvariantCulture) :
                                       eventHubDescription.PartitionCount.ToString(CultureInfo.InvariantCulture);
                setting = ConfigurationManager.AppSettings[RetentionDaysParameter];
                txtMessageRetentionInDays.Text = int.TryParse(setting, out value) ?
                                       value.ToString(CultureInfo.InvariantCulture) :
                                       eventHubDescription.MessageRetentionInDays.ToString(CultureInfo.InvariantCulture);
                txtLocation.Text = ConfigurationManager.AppSettings[LocationParameter];
                setting = ConfigurationManager.AppSettings[DeviceCountParameter];
                txtDeviceCount.Text = int.TryParse(setting, out value) ? 
                                       value.ToString(CultureInfo.InvariantCulture) : 
                                       DefaultDeviceNumber.ToString(CultureInfo.InvariantCulture);
                setting = ConfigurationManager.AppSettings[EventIntervalParameter];
                txtEventIntervalInSeconds.Text = int.TryParse(setting, out value) ?
                                       value.ToString(CultureInfo.InvariantCulture) :
                                       DefaultEventIntervalInSeconds.ToString(CultureInfo.InvariantCulture);
                setting = ConfigurationManager.AppSettings[MinValueParameter];
                txtMinValue.Text = int.TryParse(setting, out value) ?
                                       value.ToString(CultureInfo.InvariantCulture) :
                                       DefaultMinValue.ToString(CultureInfo.InvariantCulture);
                setting = ConfigurationManager.AppSettings[MaxValueParameter];
                txtMaxValue.Text = int.TryParse(setting, out value) ?
                                       value.ToString(CultureInfo.InvariantCulture) :
                                       DefaultMaxValue.ToString(CultureInfo.InvariantCulture);
            }
            catch (Exception ex)
            {
                HandleException(ex);
            }
        }

        private void WriteToLog(string message)
        {
            if (InvokeRequired)
            {
                Invoke(new Action<string>(InternalWriteToLog), new object[] { message });
            }
            else
            {
                InternalWriteToLog(message);
            }
        }

        private void InternalWriteToLog(string message)
        {
            lock (this)
            {
                if (string.IsNullOrEmpty(message))
                {
                    return;
                }
                var lines = message.Split('\n');
                var now = DateTime.Now;
                var space = new string(' ', 19);

                for (var i = 0; i < lines.Length; i++)
                {
                    if (i == 0)
                    {
                        var line = string.Format(DateFormat,
                                                 now.Hour,
                                                 now.Minute,
                                                 now.Second,
                                                 lines[i]);
                        lstLog.Items.Add(line);
                    }
                    else
                    {
                        lstLog.Items.Add(space + lines[i]);
                    }
                }
                lstLog.SelectedIndex = lstLog.Items.Count - 1;
                lstLog.SelectedIndex = -1;
            }
        }

        #endregion

        #region Event Handlers

        private void exitToolStripMenuItem_Click(object sender, EventArgs e)
        {
            Close();
        }

        private void clearLogToolStripMenuItem_Click(object sender, EventArgs e)
        {
            lstLog.Items.Clear();
        }

        /// <summary>
        /// Saves the log to a text file
        /// </summary>
        /// <param name="sender">MainForm object</param>
        /// <param name="e">System.EventArgs parameter</param>
        private void saveLogToolStripMenuItem_Click(object sender, EventArgs e)
        {
            try
            {
                if (lstLog.Items.Count <= 0)
                {
                    return;
                }
                saveFileDialog.Title = SaveAsTitle;
                saveFileDialog.DefaultExt = SaveAsExtension;
                saveFileDialog.Filter = SaveAsFilter;
                saveFileDialog.FileName = string.Format(LogFileNameFormat, DateTime.Now.ToString(CultureInfo.CurrentUICulture).Replace('/', '-').Replace(':', '-'));
                if (saveFileDialog.ShowDialog() != DialogResult.OK || 
                    string.IsNullOrEmpty(saveFileDialog.FileName))
                {
                    return;
                }
                using (var writer = new StreamWriter(saveFileDialog.FileName))
                {
                    foreach (var t in lstLog.Items)
                    {
                        writer.WriteLine(t as string);
                    }
                }
            }
            catch (Exception ex)
            {
                HandleException(ex);
            }
        }

        private void logWindowToolStripMenuItem_Click(object sender, EventArgs e)
        {
            splitContainer.Panel2Collapsed = !((ToolStripMenuItem)sender).Checked;
        }

        private void aboutToolStripMenuItem_Click(object sender, EventArgs e)
        {
            var form = new AboutForm();
            form.ShowDialog();
        }

        private void lstLog_Leave(object sender, EventArgs e)
        {
            lstLog.SelectedIndex = -1;
        }

        private void button_MouseEnter(object sender, EventArgs e)
        {
            var control = sender as Control;
            if (control != null)
            {
                control.ForeColor = Color.White;
            }
        }

        private void button_MouseLeave(object sender, EventArgs e)
        {
            var control = sender as Control;
            if (control != null)
            {
                control.ForeColor = SystemColors.ControlText;
            }
        }
        
        private void MainForm_Paint(object sender, PaintEventArgs e)
        {
            var width = (mainHeaderPanel.Size.Width - 48)/2;
            var halfWidth = (width - 16)/2;

            txtNamespace.Size = new Size(width, txtNamespace.Size.Height);
            txtKeyName.Size = new Size(width, txtKeyName.Size.Height);
            txtKeyValue.Size = new Size(width, txtKeyValue.Size.Height);
            txtEventHub.Size = new Size(width, txtEventHub.Size.Height);
            txtLocation.Size = new Size(width, txtLocation.Size.Height);
            txtPartitionCount.Size = new Size(halfWidth, txtPartitionCount.Size.Height);
            txtMessageRetentionInDays.Size = new Size(halfWidth, txtMessageRetentionInDays.Size.Height);
            txtDeviceCount.Size = new Size(halfWidth, txtDeviceCount.Size.Height);
            txtEventIntervalInSeconds.Size = new Size(halfWidth, txtEventIntervalInSeconds.Size.Height);
            txtMinValue.Size = new Size(halfWidth, txtMinValue.Size.Height);
            txtMinValue.Size = new Size(halfWidth, txtMinValue.Size.Height);

            txtEventHub.Location = new Point(32 + width, txtEventHub.Location.Y);
            txtKeyValue.Location = new Point(32 + width, txtKeyValue.Location.Y);
            txtLocation.Location = new Point(32 + width, txtLocation.Location.Y);
            txtMessageRetentionInDays.Location = new Point(32 + halfWidth, txtMessageRetentionInDays.Location.Y);
            txtEventIntervalInSeconds.Location = new Point(32 + halfWidth, txtEventIntervalInSeconds.Location.Y);
            txtMinValue.Location = new Point(32 + width, txtMinValue.Location.Y);
            txtMaxValue.Location = new Point(48 + width + halfWidth, txtMaxValue.Location.Y);

            lblEventHub.Location = new Point(32 + width, lblEventHub.Location.Y);
            lblKeyValue.Location = new Point(32 + width, lblKeyValue.Location.Y);
            lblLocation.Location = new Point(32 + width, lblLocation.Location.Y);
            lblMessageRetentionInDays.Location = new Point(32 + halfWidth, lblMessageRetentionInDays.Location.Y);
            lblEventIntervalInSeconds.Location = new Point(32 + halfWidth, lblEventIntervalInSeconds.Location.Y);
            lblMinValue.Location = new Point(32 + width, lblMinValue.Location.Y);
            lblMaxValue.Location = new Point(48 + width + halfWidth, lblMaxValue.Location.Y);
            radioButtonHttps.Location = new Point(32 + halfWidth, radioButtonAmqp.Location.Y);
        }

        private void MainForm_Shown(object sender, EventArgs e)
        {
            txtNamespace.SelectionLength = 0;
        }

        private async void btnStart_Click(object sender, EventArgs e)
        {
            try
            {
                if (string.Compare(btnStart.Text, Start, StringComparison.OrdinalIgnoreCase) == 0)
                {
                    // Change button text
                    btnStart.Text = Stop;

                    // Validate parameters
                    if (!ValidateParameters())
                    {
                        return;
                    }

                    // Create namespace manager
                    var namespaceUri = ServiceBusEnvironment.CreateServiceUri("sb", txtNamespace.Text, string.Empty);
                    var tokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(txtKeyName.Text, txtKeyValue.Text);
                    var namespaceManager = new NamespaceManager(namespaceUri, tokenProvider);

                    // Check if the event hub already exists, if not, create the event hub.
                    var eventHubDescription = await namespaceManager.EventHubExistsAsync(txtEventHub.Text) ?
                                              await namespaceManager.GetEventHubAsync(txtEventHub.Text) :
                                              await namespaceManager.CreateEventHubAsync(new EventHubDescription(txtEventHub.Text)
                                              {
                                                  PartitionCount = txtPartitionCount.IntegerValue,
                                                  MessageRetentionInDays = txtMessageRetentionInDays.IntegerValue
                                              });
                    WriteToLog(string.Format(EventHubCreatedOrRetrieved, txtEventHub.Text));

                    // Check if the SAS authorization rule used by devices to send events to the event hub already exists, if not, create the rule.
                    var authorizationRule = eventHubDescription.
                                            Authorization.
                                            FirstOrDefault(r => string.Compare(r.KeyName, 
                                                                                SenderSharedAccessKey, 
                                                                                StringComparison.InvariantCultureIgnoreCase) 
                                                                                == 0) as SharedAccessAuthorizationRule;
                    if (authorizationRule == null)
                    {
                        authorizationRule = new SharedAccessAuthorizationRule(SenderSharedAccessKey, 
                                                                                 SharedAccessAuthorizationRule.GenerateRandomKey(), 
                                                                                 new[]
                                                                                 {
                                                                                     AccessRights.Send
                                                                                 });
                        eventHubDescription.Authorization.Add(authorizationRule);
                        await namespaceManager.UpdateEventHubAsync(eventHubDescription);
                    }
                    
                    cancellationTokenSource = new CancellationTokenSource();
                    var serviceBusNamespace = txtNamespace.Text;
                    var eventHubName = txtEventHub.Text;
                    var senderKey = authorizationRule.PrimaryKey;
                    var location = txtLocation.Text;
                    var eventInterval = txtEventIntervalInSeconds.IntegerValue * 1000;
                    var minValue = txtMinValue.IntegerValue;
                    var maxValue = txtMaxValue.IntegerValue;
                    var cancellationToken = cancellationTokenSource.Token;

                    // Create one task for each device
                    for (var i = 1; i <= txtDeviceCount.IntegerValue; i++)
                    {
                        var deviceId = i;
                        #pragma warning disable 4014
                        #pragma warning disable 4014
                        Task.Run(async () =>
                        #pragma warning restore 4014
                        {
                            var deviceName = string.Format("device{0:000}", deviceId);
                            var random = new Random((int)DateTime.Now.Ticks);

                            if (radioButtonAmqp.Checked)
                            {
                                // The token has the following format: 
                                // SharedAccessSignature sr={URI}&sig={HMAC_SHA256_SIGNATURE}&se={EXPIRATION_TIME}&skn={KEY_NAME}
                                var token = CreateSasTokenForAmqpSender(SenderSharedAccessKey,
                                                                        senderKey,
                                                                        serviceBusNamespace,
                                                                        eventHubName,
                                                                        deviceName,
                                                                        TimeSpan.FromDays(1));
                                WriteToLog(string.Format(SasToken, deviceId));

                                var messagingFactory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", serviceBusNamespace, ""), new MessagingFactorySettings
                                {
                                    TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(token),
                                    TransportType = TransportType.Amqp
                                });
                                WriteToLog(string.Format(MessagingFactoryCreated, deviceId));

                                // Each device uses a different publisher endpoint: [EventHub]/publishers/[PublisherName]
                                var eventHubClient = messagingFactory.CreateEventHubClient(String.Format("{0}/publishers/{1}", eventHubName, deviceName));
                                WriteToLog(string.Format(EventHubClientCreated, deviceId, eventHubClient.Path));

                                while (!cancellationToken.IsCancellationRequested)
                                {
                                    // Create random value
                                    var value = random.Next(minValue, maxValue + 1);

                                    // Create EventData object with the payload serialized in JSON format 
                                    using (var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Payload
                                    {
                                        EventId = eventId++,
                                        DeviceId = deviceId,
                                        Value = value,
                                        Timestamp = DateTime.UtcNow
                                    })))
                                    {
                                        PartitionKey = deviceName
                                    })
                                    {
                                        // Create custom properties
                                        eventData.Properties.Add(DeviceId, deviceId);
                                        eventData.Properties.Add(DeviceName, deviceName);
                                        eventData.Properties.Add(DeviceLocation, location);
                                        eventData.Properties.Add(Value, value);

                                        // Send the event to the event hub
                                        await eventHubClient.SendAsync(eventData);
                                        WriteToLog(string.Format(EventSent, deviceId, deviceName, value));
                                    }

                                    // Wait for the event time interval
                                    Thread.Sleep(eventInterval);
                                }
                            }
                            else
                            {
                                // The token has the following format: 
                                // SharedAccessSignature sr={URI}&sig={HMAC_SHA256_SIGNATURE}&se={EXPIRATION_TIME}&skn={KEY_NAME}
                                var token = CreateSasTokenForHttpsSender(SenderSharedAccessKey,
                                                                         senderKey,
                                                                         serviceBusNamespace,
                                                                         eventHubName,
                                                                         deviceName,
                                                                         TimeSpan.FromDays(1));
                                WriteToLog(string.Format(SasToken, deviceId));

                                // Create HttpClient object used to send events to the event hub.
                                var httpClient = new HttpClient
                                {
                                    BaseAddress = new Uri(String.Format("https://{0}.servicebus.windows.net/{1}/publishers/{2}", 
                                                                        serviceBusNamespace, 
                                                                        eventHubName, 
                                                                        deviceName).ToLower())
                                };
                                httpClient.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", token);
                                httpClient.DefaultRequestHeaders.Add("ContentType", "application/json;type=entry;charset=utf-8");
                                WriteToLog(string.Format(HttpClientCreated, deviceId, httpClient.BaseAddress));

                                while (!cancellationToken.IsCancellationRequested)
                                {
                                    // Create random value
                                    var value = random.Next(minValue, maxValue + 1);

                                    // Create HttpContent
                                    var postContent = new ByteArrayContent(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Payload
                                    {
                                        EventId = eventId++,
                                        DeviceId = deviceId,
                                        Value = value,
                                        Timestamp = DateTime.UtcNow
                                    })));

                                    // Create custom properties
                                    
                                    postContent.Headers.Add(DeviceId, deviceId.ToString(CultureInfo.InvariantCulture));
                                    postContent.Headers.Add(DeviceName, deviceName);
                                    //postContent.Headers.Add(DeviceLocation, location);
                                    postContent.Headers.Add(Value, value.ToString(CultureInfo.InvariantCulture));

                                    try
                                    {
                                        var response = await httpClient.PostAsync(httpClient.BaseAddress + "/messages" + "?timeout=60" + ApiVersion, postContent, cancellationToken);
                                        response.EnsureSuccessStatusCode();
                                        WriteToLog(string.Format(EventSent, deviceId, deviceName, value));
                                    }
                                    catch (HttpRequestException ex)
                                    {
                                        WriteToLog(string.Format(SendFailed, deviceId, ex.Message));
                                    }
                                }
                            }
                        },
                        cancellationToken).ContinueWith(t =>
#pragma warning restore 4014
                        #pragma warning restore 4014
                        {
                            if (t.IsFaulted && t.Exception != null)
                            {
                                HandleException(t.Exception);
                            }
                        }, cancellationToken);
                    }

                }
                else
                {
                    // Change button text
                    btnStart.Text = Start;
                    if (cancellationTokenSource != null)
                    {
                        cancellationTokenSource.Cancel();
                    }
                }
            }
            catch (Exception ex)
            {
                HandleException(ex);
            }
        }

        private bool ValidateParameters()
        {
            if (string.IsNullOrWhiteSpace(txtNamespace.Text))
            {
                WriteToLog(NamespaceCannonBeNull);
                return false;
            }
            if (string.IsNullOrWhiteSpace(txtEventHub.Text))
            {
                WriteToLog(EventHubNameCannonBeNull);
                return false;
            }
            if (string.IsNullOrWhiteSpace(txtKeyName.Text))
            {
                WriteToLog(KeyNameCannonBeNull);
                return false;
            }
            if (string.IsNullOrWhiteSpace(txtKeyValue.Text))
            {
                WriteToLog(KeyValueCannonBeNull);
                return false;
            }
            return true;
        }

        public static string CreateSasTokenForAmqpSender(string senderKeyName, 
                                                         string senderKey, 
                                                         string serviceNamespace, 
                                                         string hubName, 
                                                         string publisherName, 
                                                         TimeSpan tokenTimeToLive)
        {
            // This is the format of the publisher endpoint. Each device uses a different publisher endpoint.
            // sb://<NAMESPACE>.servicebus.windows.net/<EVENT_HUB_NAME>/publishers/<PUBLISHER_NAME>. 
            var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", 
                                                                    serviceNamespace, 
                                                                    String.Format("{0}/publishers/{1}", 
                                                                                   hubName, 
                                                                                   publisherName))
                .ToString()
                .Trim('/');
            // SharedAccessSignature sr=<URL-encoded-resourceURI>&sig=<URL-encoded-signature-string>&se=<expiry-time-in-ISO-8061-format. >&skn=<senderKeyName>
            return SharedAccessSignatureTokenProvider.GetSharedAccessSignature(senderKeyName, senderKey, serviceUri, tokenTimeToLive);
        }

        // Create a SAS token for a specified scope. SAS tokens are described in http://msdn.microsoft.com/en-us/library/windowsazure/dn170477.aspx.
        private static string CreateSasTokenForHttpsSender(string senderKeyName, 
                                                           string senderKey, 
                                                           string serviceNamespace, 
                                                           string hubName, 
                                                           string publisherName, 
                                                           TimeSpan tokenTimeToLive)
        {
            // Set token lifetime. When supplying a device with a token, you might want to use a longer expiration time.
            var origin = new DateTime(1970, 1, 1, 0, 0, 0, 0);
            var difference = DateTime.Now.ToUniversalTime() - origin;
            var tokenExpirationTime = Convert.ToUInt32(difference.TotalSeconds) + tokenTimeToLive.Seconds;

            // https://<NAMESPACE>.servicebus.windows.net/<EVENT_HUB_NAME>/publishers/<PUBLISHER_NAME>. 
            var uri = ServiceBusEnvironment.CreateServiceUri("https", serviceNamespace, String.Format("{0}/publishers/{1}", hubName, publisherName))
                .ToString()
                .Trim('/');
            var stringToSign = HttpUtility.UrlEncode(uri) + "\n" + tokenExpirationTime;
            var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(senderKey));

            var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));

            // SharedAccessSignature sr=<URL-encoded-resourceURI>&sig=<URL-encoded-signature-string>&se=<expiry-time-in-ISO-8061-format. >&skn=<senderKeyName>
            var token = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}",
            HttpUtility.UrlEncode(uri), HttpUtility.UrlEncode(signature), tokenExpirationTime, senderKeyName);
            return token;
        }
        #endregion
    }
}


Comments (12)

  1. Andy says:

    Thanks, this is very helpful.

    Should there be a timer so the checkpointasync isn't called every message? Doesn't this potentially run up very high azure bills by constantly polling the storage?

    Thanks.

  2. Hi Andy

    The answer is: it depends on what level of reliability you want to implement in your code and how many messages you read every time the ProcessEventsAsync method is invoked. If you invoke the Checkpoint or CheckpointAsync method every time you receive a message, like in this sample, you persist to the partition blob lease the offset of latest message processed. In case of fault of the current EventProcessor, this reduces the likelihood that the EventProcessor that takes over the ownership of the current paritition will have  to reprocess the same events. However, this comes to a cost as invoking the Checkpoint method implies a write operation to a blob and this definitely can slow down event processing. I developed other samples where I use the technique that you mentioned: instead of executing the Checkpoint method whenever it receives a new message, the EventProcessor executes the Checkpoint every k minutes, where is k is parameter passed in the constructor. This approach is faster, as the write operation on the partition is done once in a while, but less reliable. I hope I answered your question.

    Ciao

    Paolo

  3. Charles says:

    You mentioned that each device uses a different publisher endpoint: [EventHub]/publishers/[PublisherName]

    If i have 4 category of devices and each device category would send 3 or 4 type of sensor data, what is the event hub configuration would be.

    Do i need to create an event hub for each device category or  one event hub for all category and PublisherName can be used to identify the specific device instance of specific category.

    Any help would be appreciated, Thanks

  4. Hi Charles

    Events ingested by an event hub are not strongly-typed. In other words, a single event hub can be used to ingest heterogeneous events of different kinds. It's up to the consumer app (in this sample the EventProcessor class and in particular the ProcessEventsAsync method) to recognize between different kinds of payload and perform a different action for each event type. You can also use different event hubs for different device categories or payloads if you like, but this is not strictly necessary. Hope this helps.

    Paolo

  5. Charles says:

    Hi Paolo,

    I got it and appreciate your feedback. Yes, it make sense.  Thanks.

  6. Gon says:

    How many messages could a worker role process. We have a similar situation where we plan to process 20K events per second. We'll store them in Azure Table, each event will have a different source and will be persisted in a different table/partition.

  7. Hi Gon

    The answer really depends on many factors:

    – The number of throughput units that you define for your event hub. By default, each throughput unit allows up to 1 MB/sec or 1000 messages/sec in ingress and 2 MB/sec or 2000 messages/sec in egress. Each partition can use at maximum a throughput unit.

    – The degree of parallelism that you implement in your solution: in this case the degree of parallelism is defined by the number of partitions of your event hubs and the total number of worker role instances used to read the stream of data. Since, when using a consumer group and the EventProcessorHost to read data from  an event hub, you can have maximum a reader per partition, this means that you can have the maximum degree of parallelism when the number of worker role instances matches the number of partitions. In that case, there's a separate worker role instance reading messages out of each partition.

    – The virtual machine size used for the worker role instance. In this case, the larger the better.

    – The speed limits and constrains of the repository where the worker role will store data. At the time, a single storage account supports up to 20,000 IOPS, entities per second, or messages per second, while a single table supports up to 2000 entities per second.

    – The data sharding/distribution used to persist data. When using storage tables, you need to wisely choose a PartitionKey and RowKey to wisely and evenly distribute data across multiple partitions, multiple tables and possibly multiple storage accounts.

    Hope this helps!

    Ciao

    Paolo

  8. Rac says:

    I am learning about EventHub, but could not get the package working in VS2015. Will you be compiling for VS2015? Thank you so much for your great example here.

  9. Hi Rac

    If you are using Visual Studio 2015, just install the latest version of the Azure (at the moment the latest is SDK v.2.7). This is supported by Visual Studio 2015 and when you open the project you are asked to upgrade the SDK version used by the project to the one you have installed on your machine. Also make sure to upgrade all the NuGet packages used by the application. I just tried both steps and they worked just fine. Good luck!

    Ciao

    Paolo

  10. Rac says:

    Thanks Paolo for your prompt guidance. I did the following:

    1. Convert the proejct to target Microsoft Azure tools – v2.7

    2. Click on Manage NuGet packages for solution

    In the top right hand corver click "Restore"

    the Filter all the installed nuget.

    Microsoft.Azure.ServiceBus.EventProcessorHost v2.0.3

    Microsoft.Data.Edm v5.6.4

    Microsoft.Data.OData v5.6.4

    Microsoft.Data.Services.Client v5.6.4

    Microsoft.WindowsAzure.ConfigurationManager v3.1.0

    Newtonsoft.Json v7.0.1

    System.Spatial v5.6.4

    WindowsAzure.ServiceBus v3.0.2

    WindowsAzure.Storage v5.0.2

    3. Select Intall for each on of those.and apply update.

    4. Rebuild solution. and I got the following:

    Severity Code Description Project File Line

    Error CS0103 The name 'CloudConfigurationManager' does not exist in the current context Helpers C:UsersdevelopDocumentsVisual Studio 2015ProjectsStoreEventHubToAzureSQLDemoHelpersCloudConfigurationHelper.cs 41

    Severity Code Description Project File Line

    Warning Could not resolve this reference. Could not locate the assembly "System.Net.Http.Formatting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL". Check to make sure the assembly exists on disk. If this reference is required by your code, you may get compilation errors. Sender

    Severity Code Description Project File Line

    Error Metadata file 'C:UsersdevelopDocumentsVisual Studio 2015ProjectsStoreEventHubToAzureSQLDemoHelpersbinDebugMicrosoft.AzureCat.Samples.Helpers.dll' could not be found EventProcessorHostWorkerRole C:UsersdevelopDocumentsVisual Studio 2015ProjectsStoreEventHubToAzureSQLDemoEventProcessorHostWorkerRoleCSC

    Severity Code Description Project File Line

    Error WAT070 : The referenced assembly C:UsersdevelopDocumentsVisual Studio 2015ProjectsStoreEventHubToAzureSQLDemoEventProcessorHostWorkerRolebinDebugMicrosoft.AzureCat.Samples.EventProcessorHostWorkerRole.dll was not found. Please make sure to build the role project that produces this assembly before building this Microsoft Azure Cloud Service Project. StoreEventsToAzureSqlDatabase C:Program Files (x86)MSBuildMicrosoftVisualStudiov14.0Windows Azure Tools2.7Microsoft.WindowsAzure.targets 1329

  11. Hi Rac

    I updated a new version to MSDN Code Gallery that I updated to VS 2015 and SDK 2.7. I tested it locally, but i didn't have time to deploy and test it on Azure. Sorry about that. I'm busy in this period and I can't do more than this.

    Ciao

    Paolo

  12. Rac says:

    I really appreciate your assistance Paolo. Thank you so much.

Skip to main content