Kirk Evans Blog

.NET From a Markup Perspective

Device Simulator for Event Hubs

This post will show how to build a simulator to send messages to Azure Event Hubs.

Background

I have been doing a lot of work with Event Hubs stream processing lately, and I find that I need different types of device simulators.  I need one that sends a few messages, one that continuously sends a few messages, one that sends a lot of messages, and so on.  I have probably 10 different console apps that do nothing more than send messages to Event Hubs in a loop, so I decided to make one simulator that fits all my needs.

This code adapts the code found in the Service Bus Event Hubs Getting Started sample.

The application makes it possible to simulate a number of devices sending X number of messages, with a sleep duration between each operation.  You can run a finite number of times, or run continuously, and the application enables the end user to cancel the operation by pressing ENTER.

image

This is a console application built with .NET, but I could have just as easily created a Java application that uses JMS to send messages.  I showed how to do this in the post Use JMS and Azure Event Hubs with Eclipse

Reusing Existing Code

I created a library called EventHubDemo.Common that has two classes, MetricEvent and EventHubManager.  Those classes are detailed in my post Scaling Azure Event Hubs Processing with Worker Roles as I use the same code between the sender (this post) and the receiver.

MetricEvent.cs
  1. using System.Runtime.Serialization;
  2.  
  3. namespace EventHubDemo.Common.Contracts
  4. {
  5.     [DataContract]
  6.     public class MetricEvent
  7.     {
  8.         [DataMember]
  9.         public int DeviceId { get; set; }
  10.         [DataMember]
  11.         public int Temperature { get; set; }
  12.     }
  13. }

EventHubManager.cs
  1. using Microsoft.ServiceBus;
  2. using Microsoft.ServiceBus.Messaging;
  3. using System;
  4. using System.Diagnostics;
  5.  
  6. namespace EventHubDemo.Common.Utility
  7. {
  8.     public class EventHubManager
  9.     {
  10.         public static string GetServiceBusConnectionString()
  11.         {
  12.             string connectionString = Microsoft.WindowsAzure.CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
  13.             
  14.             if (string.IsNullOrEmpty(connectionString))
  15.             {
  16.                 Trace.WriteLine("Did not find Service Bus connections string in appsettings (app.config)");
  17.                 return string.Empty;
  18.             }
  19.             ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
  20.             builder.TransportType = TransportType.Amqp;
  21.             return builder.ToString();
  22.         }
  23.  
  24.         public static NamespaceManager GetNamespaceManager()
  25.         {
  26.             return NamespaceManager.CreateFromConnectionString(GetServiceBusConnectionString());
  27.         }
  28.  
  29.         public static NamespaceManager GetNamespaceManager(string connectionString)
  30.         {
  31.             return NamespaceManager.CreateFromConnectionString(connectionString);
  32.         }
  33.  
  34.  
  35.         public static void CreateEventHubIfNotExists(string eventHubName, int numberOfPartitions, NamespaceManager manager)
  36.         {
  37.             try
  38.             {
  39.                 // Create the Event Hub
  40.                 Trace.WriteLine("Creating Event Hub…");
  41.                 EventHubDescription ehd = new EventHubDescription(eventHubName);
  42.                 ehd.PartitionCount = numberOfPartitions;
  43.                 manager.CreateEventHubIfNotExistsAsync(ehd).Wait();
  44.             }
  45.             catch (AggregateException agexp)
  46.             {
  47.                 Trace.WriteLine(agexp.Flatten());
  48.             }
  49.         }
  50.  
  51.     }
  52. }

Creating the Console Application

I created a new console application named Sender.  I added a reference to my EventHubDemo.Common library, and added NuGet packages for Service Bus and Json.NET.

image

Realizing that others might use this program, I wanted to give a little documentation at runtime.  I created a class MyArgs.cs that is responsible for parsing arguments, providing help, and providing current values.

MyArgs.cs
  1. using System;
  2. using System.Text;
  3.  
  4. namespace EventHubDemo.Sender
  5. {
  6.     public class MyArgs
  7.     {
  8.         public MyArgs()
  9.         {
  10.         }
  11.  
  12.         public string eventHubName { get; set; }
  13.  
  14.         public int numberOfDevices { get; set; }
  15.  
  16.         public int numberOfMessages { get; set; }
  17.  
  18.         public int numberOfPartitions { get; set; }        
  19.  
  20.         public int sleepSeconds { get; set; }
  21.  
  22.         public int iterations { get; set; }
  23.  
  24.  
  25.         public void ParseArgs(string[] args)
  26.         {
  27.             if (args.Length != 6)
  28.             {
  29.                 throw new ArgumentException("Incorrect number of arguments. Expected 6 args <eventhubname> <NumberOfDevices> <NumberOfMessagesToSend> <NumberOfPartitions> <sleepSeconds> <iterations>", args.ToString());
  30.             }
  31.             else
  32.             {
  33.                 eventHubName = args[0];
  34.                 numberOfDevices = Int32.Parse(args[1]);
  35.                 numberOfMessages = Int32.Parse(args[2]);
  36.                 numberOfPartitions = Int32.Parse(args[3]);
  37.                 sleepSeconds = int.Parse(args[4]);
  38.                 iterations = int.Parse(args[5]);
  39.             }
  40.         }
  41.  
  42.         internal string GetHelp()
  43.         {
  44.             StringBuilder sb = new StringBuilder();
  45.             sb.AppendLine("Usage: Sender.exe EventHubName NumberOfDevices NumberOfMessagesToSend NumberOfPartitions SleepSeconds Iterations");
  46.             sb.AppendLine();
  47.             sb.AppendLine("Parameters:");
  48.             sb.AppendLine("\tEventHubName:\t\tName of the Event Hub to send messages to");
  49.             sb.AppendLine("\tNumberOfDevices:\t\tNumber of devices to simulate");
  50.             sb.AppendLine("\tNumberOfMessagesToSend:\t\tNumber of messages to send");
  51.             sb.AppendLine("\tNumberOfPartitions:\t\tNumber of Event Hub partitions");
  52.             sb.AppendLine("\tSleepSeconds:\t\tNumber of seconds to sleep between iterations (0 to send as fast as possible)");
  53.             sb.AppendLine("\tIterations:\t\tNumber of iterations (-1 to continuously send)");
  54.             sb.AppendLine();
  55.  
  56.             return sb.ToString();
  57.         }
  58.  
  59.         public override string ToString()
  60.         {
  61.             StringBuilder sb = new StringBuilder();
  62.             sb.AppendLine("Event Hub name: " + eventHubName);
  63.             sb.AppendLine("Number of devices: " + numberOfDevices);
  64.             sb.AppendLine("Number of messages: " + numberOfMessages);
  65.             sb.AppendLine("Number of partitions: " + numberOfPartitions);
  66.             sb.AppendLine("Seconds to sleep between iterations: " + sleepSeconds);
  67.             sb.AppendLine("Number of iterations: " + iterations);
  68.  
  69.             return sb.ToString();
  70.         }
  71.     }
  72. }

I then adapted the code from Service Bus Event Hubs Getting Started to create a class responsible for sending messages to Event Hub.

Sender.cs
  1. using EventHubDemo.Common.Contracts;
  2. using Microsoft.ServiceBus.Messaging;
  3. using Newtonsoft.Json;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9.  
  10.  
  11. namespace EventHubDemo.Sender
  12. {
  13.     public class Sender
  14.     {        
  15.         private string eventHubName;
  16.         private int numberOfDevices;
  17.         private int numberOfMessages;
  18.  
  19.  
  20.         public Sender(string eventHubName, int numberOfDevices, int numberOfMessages)
  21.         {
  22.             this.eventHubName = eventHubName;
  23.             this.numberOfDevices = numberOfDevices;
  24.             this.numberOfMessages = numberOfMessages;            
  25.         }
  26.  
  27.         public void SendEvents()
  28.         {
  29.             // Create EventHubClient
  30.             EventHubClient client = EventHubClient.Create(this.eventHubName);
  31.  
  32.             try
  33.             {
  34.                 List<Task> tasks = new List<Task>();
  35.                 
  36.                 // Send messages to Event Hub
  37.                 Trace.TraceInformation("Sending messages to Event Hub " + client.Path);
  38.  
  39.                 Random random = new Random();
  40.                 for (int i = 0; i < this.numberOfMessages; ++i)
  41.                 {
  42.                     // Create the device/temperature metric
  43.                     MetricEvent info = new MetricEvent() { DeviceId = random.Next(numberOfDevices), Temperature = random.Next(100) };
  44.                     var serializedString = JsonConvert.SerializeObject(info);
  45.  
  46.                     EventData data = new EventData(Encoding.UTF8.GetBytes(serializedString));
  47.  
  48.                     // Set user properties if needed
  49.                     data.Properties.Add("Type", "Telemetry_" + DateTime.Now.ToLongTimeString());
  50.                     OutputMessageInfo("SENDING: ", data, info);
  51.  
  52.                     // Send the metric to Event Hub
  53.                     tasks.Add(client.SendAsync(data));
  54.                 }
  55.                 ;
  56.  
  57.                 Task.WaitAll(tasks.ToArray());
  58.             }
  59.             catch (Exception exp)
  60.             {
  61.                 Trace.TraceError("Error on send: " + exp.Message);
  62.             }
  63.  
  64.             client.CloseAsync().Wait();
  65.         }
  66.  
  67.         static void OutputMessageInfo(string action, EventData data, MetricEvent info)
  68.         {
  69.             if (data == null)
  70.             {
  71.                 return;
  72.             }
  73.             if (info != null)
  74.             {
  75.                 Trace.TraceInformation("{0}: Device {1}, Temperature {2}.", action, info.DeviceId, info.Temperature);
  76.             }
  77.         }
  78.     }
  79. }

I’ve used a variation of this Sender class in a number of ways, using various loops to simulate.  What I finally settled on is to call the Sender.cs class and using a little bit of threading code.

Program.cs
  1. using EventHubDemo.Common.Utility;
  2. using System;
  3. using System.Diagnostics;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6.  
  7. namespace EventHubDemo.Sender
  8. {
  9.     class Program
  10.     {
  11.         protected ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
  12.         protected CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
  13.         private MyArgs a;
  14.  
  15.         public Program(MyArgs args)
  16.         {
  17.             a = args;
  18.         }
  19.         static void Main(string[] args)
  20.         {
  21.             MyArgs a = new MyArgs();
  22.  
  23.             try
  24.             {
  25.                 a.ParseArgs(args);
  26.             }
  27.             catch
  28.             {
  29.                 Console.WriteLine(a.GetHelp());
  30.                 return;
  31.             }
  32.             
  33.  
  34.             Trace.TraceInformation(a.ToString());
  35.  
  36.             Program p = new Program(a);
  37.                         
  38.             var token = p.cancellationTokenSource.Token;
  39.  
  40.             Task.Factory.StartNew(() => p.Run(token, a));
  41.             Task.Factory.StartNew(() => p.WaitForEnter());
  42.  
  43.             p.runCompleteEvent.WaitOne();            
  44.             
  45.         }
  46.  
  47.         protected void WaitForEnter()
  48.         {
  49.             Console.WriteLine("Press enter key to stop worker.");
  50.             Console.ReadLine();
  51.             cancellationTokenSource.Cancel();
  52.         }
  53.  
  54.  
  55.         private void Run(CancellationToken token, MyArgs a)
  56.         {
  57.  
  58.             if (a.iterations == -1)
  59.             {
  60.                 //Continuously iterate
  61.                 while (!token.IsCancellationRequested)
  62.                 {
  63.                     SendMessages(a);
  64.  
  65.                     //Convert to milliseconds
  66.                     Thread.Sleep(a.sleepSeconds * 1000);
  67.                 }
  68.                 runCompleteEvent.Set();
  69.             }
  70.             else
  71.             {
  72.                 //Iterate a finite number of times, enabling the user
  73.                 //  to cancel the operation
  74.                 for (int i = 0; i < a.iterations; i++)
  75.                 {
  76.                     if (!token.IsCancellationRequested)
  77.                     {
  78.                         SendMessages(a);
  79.  
  80.                         //Convert to milliseconds
  81.                         Thread.Sleep(a.sleepSeconds * 1000);
  82.                     }
  83.                     else
  84.                     {
  85.                         break;
  86.                     }
  87.                 }
  88.                 runCompleteEvent.Set();
  89.             }                           
  90.         }
  91.  
  92.         private static void SendMessages(MyArgs a)
  93.         {
  94.             var namespaceManager = EventHubManager.GetNamespaceManager();
  95.             EventHubManager.CreateEventHubIfNotExists(a.eventHubName, a.numberOfPartitions, namespaceManager);
  96.  
  97.             Sender s = new Sender(a.eventHubName, a.numberOfDevices, a.numberOfMessages);
  98.             Trace.TraceInformation("Sending events");
  99.             s.SendEvents();
  100.         }        
  101.     }
  102. }

Our program’s entry point is, of course, the static Main method.  We parse the arguments and then reference the cancellation token.  This allows us to check for token.IsCancellationRequested to see if the user cancelled the current operation.  We then use a ManualResetEvent to wait for a signal that the application is complete.  This allows us to run for a single iteration and then terminate, or to continuously iterate. 

When the Run method is called, we check to see if the user wants to run for a finite number of times or to continuously execute.   In both cases, we enable the user to cancel the operation.

Side note: See Lync and Outlook teams?  The white screen of death is avoidable.

We call the Service Bus library to send messages to the Event Hub, and sleep for X number of seconds between iterations. 

App.Config

This part is also worth mentioning… we store the connection string for Service Bus and the Azure Storage Account connection string in app.config.  I also added the System.Diagnostics.ConsoleTraceListener so that all Trace output messages appear in the Console window.

Code Snippet
  1. <appSettings>
  2.   <!– Service Bus specific app setings for messaging connections –>
  3.   <add key="Microsoft.ServiceBus.ConnectionString"
  4.        value="Endpoint=sb://kirkevans.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED=" />
  5.   <!–<add key="AzureStorageConnectionString" value="UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://127.0.0.1:10000/" />–>
  6.   <add key="AzureStorageConnectionString"
  7.        value="DefaultEndpointsProtocol=https;AccountName=kirkestorage;AccountKey=REDACTED=="/>
  8. </appSettings>
  9. <system.diagnostics>
  10.   <trace autoflush="true">
  11.     <listeners>
  12.       <add name="consoleListener"
  13.            type="System.Diagnostics.ConsoleTraceListener"/>
  14.     </listeners>
  15.   </trace>
  16. </system.diagnostics>

Debugging With Visual Studio

To enable debugging in Visual Studio, you need to provide the command-line arguments.  Realizing that you might not have had to write a console application in a few years, just go to the project’s properties and set the command line arguments on the Debug tab.

image

The command line arguments I used were:

devicereadings 1000 5 16 1 –1

The Event Hub name is “devicereadings”, I am simulating 1000 devices, sending 5 messages per batch.  There are 16 partitions used for the Event Hub, and we sleep for 1 second between batches.  The program runs continuously until the user terminates.

The Result

We start the Sender using our desired parameters.

SNAGHTML23ee081b

To see if it is actually working, I run a worker role that processes the Event Hub streams in the local compute emulator (details on how to create this are in the post Scaling Azure Event Hubs Processing with Worker Roles).

image

Summary

This post shows how to create a simple device simulator to send messages to Azure Event Hubs.  I realize that using Console Applications is not the most interesting demo in the world, and it would be nice to do something with this data besides just dumping it to Console output.  We’ll look at that in some upcoming posts.

For More Information

Service Bus Event Hubs Getting Started

Scaling Azure Event Hubs Processing with Worker Roles

Use JMS and Azure Event Hubs with Eclipse