This post will show how to host an Azure Event Hubs EventProcessorHost in a cloud service worker role.
Imagine that you have a scenario where you have a solution that needs to ingest millions of events per second, and you need to process these events in near real time. There are many scenarios where this might occur. Perhaps you built a solution for a consumer device such as Microsoft Band using the recently announced Microsoft Band SDK preview, you have a popular game where you are processing telemetry events, or you might have an IoT solution where you are processing events from many devices. Architecting for this type of solution poses many challenges. How do you enable ingest of messages at this scale? More importantly, how do you enable processing of the messages in a manner which does not create a performance bottleneck? How do you independently scale the message ingest and message processing capabilities? Most importantly, how do you achieve this for very little cost?
Event Hubs and EventProcessorHost
Azure Event Hubs enables massive scale telemetry processing for solutions that require logging millions of events per second in near real time. It does this by creating a gigantic write-ahead stream where the ingest and processing of data is handled separately. One way to think of this is as a massively scalable queue implementation. However, unlike queues, there is no concept of queue length in Azure Event Hubs because the data is processed as a stream. I wrote a demonstration of the sender side in the post Use JMS and Azure Event Hubs with Eclipse, where many senders send information to an Event Hub named “devicereadings”. The Event Hub then distributes the messages across partitions.
The question is now how to process the events in a scalable manner. That’s exactly what Event Hubs enables, scalable event processing. The Azure Event Hubs team made this processing easy by introducing the EventProcessorHost that enables a partitioned consumer pattern. Consider the following scenario where I have a worker role that is processing all of the stream data from the Event Hub. The EventProcessorHost will automatically create an instance for each partition.
We might see that a single worker role is not sufficient for our solution, that the worker role becomes CPU bound. As we saw in my previous post, Autoscaling Azure–Cloud Services, scaling a worker role creates a new instance of the worker role. The benefit of using the EventProcessorHost is that we can add more worker role instances, and the partitions will be balanced across them. We don’t have to manage the number of processor instances, the EventProcessorHost handles that for us.
Using the EventProcessorHost is simple because you need to implement one interface, IEventProcessor, the bulk of the work will be in the ProcessEventsAsync class.
Let’s work through building the solution.
Show Me Some Code
I am going to use the Service Bus Event Hubs Getting Started sample as the basis for this post, I made some changes to the code and I’ll explain along the way. First, I created a class library called EventHubDemo.Common. It contains NuGet references to Json.NET, Microsoft Azure Service Bus, and Windows Azure Configuration Manager.
It contains two classes: MetricEvent.cs and EventHubManager.cs.
MetricEvent is just a class used to (de)serialize JSON data.
The next class, EventHubManager.cs, is just a helper class for working with Event Hubs. It centralizes common logic that is used across multiple projects to work with the Event Hub connection string and to create an Event Hub.
This makes it easy to reuse the logic in a Console application that sends messages or in an Azure worker role that receives them.
EventProcessorHost in an Azure Worker Role
The next step is to create a Cloud Service. I create a new Azure Cloud Service project named “EventProcessor” in Visual Studio.
Click OK, and you are prompted for the type of role you want to create. Choose a worker role, and rename it to something like “ReceiverRole”.
Click OK, and you now have two projects: the worker role code and the deployment project.
The bulk of the work will be in the ReceiverRole project. Add a reference to the EventHubDemo.Common library that we created previously. Next add the NuGet package “Microsoft.Azure.ServiceBus.EventProcessorHost”, which will add the required dependencies.
Next we add a class called SimpleEventProcessor.cs. This class will implement the IEventProcessor interface that we mentioned previously. When the OpenAsync method is called by the EventProcessorHost, we will write out the partition that it corresponds to. When a message is received in the ProcessEventsAsync method, we write the message out to Trace output. When the CloseAsync method is called, we write that out to trace as well.
Easy enough so far. Now add a class, Receiver.cs. This class will encapsulate the logic for registering and unregistering an IEventProcessor implementation.
The final bit is to host this in an Azure cloud service using a worker role. I updated the WorkerRole.cs class.
When the worker role starts, we capture the trace output. The worker role then starts to run, and we get settings from configuration in order to connect to the Event Hub and register our event processor. A key point to call out is line 41 where we use the host name of the worker role instance, this is needed in order to obtain a lease for the partitions across multiple instances. Also notice line 45 where we wait for a signal. That signal is set on line 67 when the worker role is stopped. The processing of events is handled on a separate thread, we only need to use the ManualResetEvent to prevent the Run method from completing (which would cause the worker role to recycle).
The sample code uses CloudConfigurationManager to obtain settings. Those settings are part of the deployment process, shown in the next section.
Go to the EventProcessor project that defines the deployment configuration for the worker role. Expand the Roles folder and double-click on the ReceiverRole node.
When you double-click, go to the Settings tab. This is where we provide the various configuration values.
Note: It is a best practice to avoid putting secrets such as connection strings in source code. You would typically set these as part of a deployment script. For more information, see Building Real-World Cloud Apps with Azure.
You can also configure diagnostics for the worker roles. We will use the diagnostics capabilities when we test out the solution. On the Configuration tab, click the Configure button.
That allows you to configure the log level for the application. I set it to Verbose.
Now right-click the deployment project and choose Publish. You are prompted to create a cloud service and storage account.
Once created, go through the wizard to complete publishing.
Watch the progress in the Microsoft Azure Activity Log.
Once deployed, you will have a single production instance for your worker role.
Testing It Out
Once the role is deployed, go to Visual Studio and right-click the ReceiverRole node under Cloud Services and choose “View Diagnostics Data”.
On that page, expand the Microsoft Azure application logs section, then click View all data.
You will then see the Azure Storage table called WADLogsTable, and you can see the events that happened. When the worker role is running, we see the entry “ReceiverRole is running”. The SimpleEventProcessor is registered once, and the EventProcessorHost takes care of creating 16 instances, one for every partition (not all log entries are shown here, but there are 16 in the log).
Let’s scale our service. While we could use the autoscale service to scale by CPU, we can see how things work by manually scaling as well. Let’s manually scale by going to the Azure management portal (https://manage.windowsazure.com) and setting the scale for the cloud service to 4.
Save, and wait while the new instances are created and our packages are deployed automatically to the new instances.
Once the 3 new virtual machines are created, we go back to see what happened in the logs. This is really cool to see how the EventProcessorHost automatically balances the processing across the available instances. You can see here that partitions 1,10, and 12 are handled by instance 2, while partitions 3,13, and 15 are handled by instance 1.
Let’s scale the number of instances down to 2.
This will cause our OnStop() method to be called in each worker role that is no longer needed, and our code to unregister the event processor is called. This balances the processing back to the remaining worker role instances. First we see the close operations:
Next we see that the closed partitions are opened using another available processor:
Testing Out the Sender
To test things out, I can use the same client that I built in the post Use JMS and Azure Event Hubs with Eclipse. This is a simple web page that sends messages to the Event Hub using AMQP 1.0. Alternatively, I can modify the code in the Service Bus Event Hubs Getting Started sample to send messages. Once I send messages to the Event Hub, we see them processed in the event log, automatically balanced across partitions among the 2 available worker roles.
Event Hubs make it very easy to build highly scalable solutions that can process millions of messages per second in near real time. This post shows how you can use an Azure worker role with EventProcessorHost to process the messages and how the load will automatically balance across the available processors.
I am not providing the sample code as a download because it is already provided in the post Service Bus Event Hubs Getting Started, and I have listed the code that I used in this post.
For More Information
Building Real-World Cloud Apps with Azure – Free eBook!
Service Bus Event Hubs Getting Started – sample used as the basis of this post