Using Service Bus Topics and Subscriptions with WCF

Introduction

In my last post, I showed how to use Windows Azure AppFabric Service Bus Queues with WCF. Service Bus Queues provide a great mechanism for asynchronous communication between between two specific applications or services. However in complex systems it’s often useful to support this kind of messaging between many applications and services. This is particularly important for systems utilising a “Message Bus” approach, where one application “publishes” events which may be consumed by any number of “subscriber” applications. While it’s possible to simulate this kind of pattern with a simple queuing mechanism, the Service Bus now has built-in support for this pattern using a feature called Topics. A topic is essentially a special kind of queue which allows multiple subscribers to independently receive and process their own messages. Furthermore, each subscription can be filtered, ensuring the subscriber only receives the messages that are of interest.

In this post I will build on the previous Service Bus Queues sample, replacing the queue with a topic. I also include two subscriber services, each which uses its own subscription attached to the topic. The first subscription is configured to receive all messages from the publisher, while the second contains a filter that only allows messages through if their EventType property is equal to '”Test Event”. The basic architecture is shown below, and the full code sample can be downloaded here.

ServiceBus

If you’ve looked through my previous Queues sample, you’ll find that there are very few changes required to support Topics and Subscriptions. The changes are:

  • The application now creates Topics and Subscriptions (including Filters) instead of Queues, using the same configuration-driven helper library
  • The client’s WCF configuration points to the topic URI
  • The two services’ WCF configuration point to the topic URI as the address and the subscription URI as the listenUri
  • The client code has extended to send metadata along with the message, allowing the subscription filters to act upon the metadata.

Although most of the sample is the same as last time, I’ve included the full description of the sample in this post to prevent readers from needing to reference both posts.

Creating the Topic and Subscriptions

To use the Service Bus, you first need to have a Windows Azure subscription. If you don’t yet have one, you can sign up for a free trial. Once you have an Azure subscription, log into the Windows Azure Portal, navigate to Service Bus, and create a new Service Namespace. You can then create one or more topics directly from the portal, and then create subscriptions under each topic. However at the time of writing, there’s no way to specify filters when creating subscriptions from the portal, so every subscription will receive every message sent to the topic. To access the full power of topics and subscriptions, you should create them programmatically. For my sample I built a small library that lets you define your topics, subscriptions (including filters) and queues in a configuration file so they can be created when needed by the application. Here’s how I created the topic and subscription used for Service Two, which is configured to filter for events where the EventType metadata property is equal to “Test Event”:

   <serviceBusSetup>
    <credentials namespace="{your namespace here}" issuer="owner" key="{your key here}" />
    <topics>
      <add name="topic1" >
        <subscriptions>
          <add name="sub2" createMode="DeleteIfExists" filter="EventType = 'Test Event'" />
        </subscriptions>
      </add>
    </topics>
  </serviceBusSetup>

Note that for any interactions with Service Bus, you’ll need to know your issuer name (“owner” by default) and secret key (a bunch of Base64 gumph), as well as your namespace, all which can be retrieved from the portal. For my sample, this info needs to go in a couple of places in each configuration file.

Defining the Contract

As with any WCF service, you need to start with the contract. Queuing technologies are inherently one-way, so you need to use the IsOneWay property on the OperationContract attribute. I chose to use a generic base interface that accepts any payload type, which can be refined for specific concrete payloads. However if you don’t want to do this, a simple single interface would work just fine.

     [ServiceContract]
    public interface IEventNotification<TLog>
    {
        [OperationContract(IsOneWay = true)]
        void OnEventOccurred(TLog value);
    }   

    [ServiceContract]
    public interface IAccountEventNotification : IEventNotification<AccountEventLog>
    {
    }

    [DataContract]
    public class AccountEventLog
    {
        [DataMember]
        public int AccountId { get; set; }

        [DataMember]
        public string EventType { get; set; }

        [DataMember]
        public DateTime Date { get; set; }
    }

Building and Hosting the Services

This sample includes two “subscriber” services, both implemented identically but running in their own IIS applications. The services are implemented exactly the same way as any other WCF service. You could build your own host, but I choose to host the services in IIS via a normal .svc file and associated code-behind class file. For my sample, whenever I receive a message I write a trace message and also store the payload in a list in a static variable, which is also displayed on a simple web page.

     public class Service1 : IAccountEventNotification 
    {
        public void OnEventOccurred(AccountEventLog log)
        {
            Trace.WriteLine(String.Format("Service One received event '{0}' for account {1}", 
                log.EventType, log.AccountId));
            Subscriber.ReceivedEvents.Add(log);
        }
    }

The magic of wiring this service up to the Service Bus all happens in configuration. First, make sure you’ve downloaded and referenced the latest version of the Microsoft.ServiceBus.dll – NuGet is the easiest way to get this (just search for “WindowsAzure.ServiceBus”).

Now it’s just a matter of telling WCF about the service, specifying the NetMessagingBinding and correct URIs, and configuring your authentication details. Since I haven’t got the SDK installed, the definitions for the bindings are specified directly in my web.config files instead of in machine.config.

The WCF configuration for Service One is shown below. Note that the topic URI is specified in the service endpoint’s address attribute, and the subscription URI is specified in its listenUri attribute.

   <system.serviceModel>
    <!-- These <extensions> will not be needed once our sdk is installed-->
    <extensions>
      <bindingElementExtensions>
        <add name="netMessagingTransport" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingElementExtensions>
      <bindingExtensions>
        <add name="netMessagingBinding" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingExtensions>
      <behaviorExtensions>
        <add name="transportClientEndpointBehavior" type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </behaviorExtensions>
    </extensions>
    <behaviors>
      <endpointBehaviors>
        <behavior name="securityBehavior">
          <transportClientEndpointBehavior>
            <tokenProvider>
              <sharedSecret issuerName="owner" issuerSecret="{your key here}" />
            </tokenProvider>
          </transportClientEndpointBehavior>
        </behavior>
      </endpointBehaviors>
    </behaviors>
    <bindings>
      <netMessagingBinding>
        <binding name="messagingBinding" closeTimeout="00:03:00" openTimeout="00:03:00" 
             receiveTimeout="00:03:00" sendTimeout="00:03:00" sessionIdleTimeout="00:01:00" prefetchCount="-1">
          <transportSettings batchFlushInterval="00:00:01" />
        </binding>
      </netMessagingBinding>
    </bindings>
    <services>
      <service name="ServiceBusPubSub.ServiceOne.Service1">
        <endpoint name="Service1"  
listenUri="sb://{your namespace here}.servicebus.windows.net/topic1/subscriptions/sub1"
                  address="sb://{your namespace here}.servicebus.windows.net/topic1" 
                  binding="netMessagingBinding" 
                  bindingConfiguration="messagingBinding" 
                  contract="ServiceBusPubSub.Contracts.IAccountEventNotification" 
                  behaviorConfiguration="securityBehavior" />
      </service>
    </services>
  </system.serviceModel>

One final (but critical) thing to note: Most IIS-hosted WCF services are automatically “woken up” whenever a message arrives. However this does not happen when working with the Service Bus—in fact it only starts listening to the subscription after it’s already awake. During development (and with the attached sample) you can wake up the services by manually browsing to the .svc file. However for production use you’ll obviously need a more resilient solution. For applications hosted on Windows Server, the best solution is to use Windows Server AppFabric to host and warm up the service as documented in this article. If you’re hosting your service in Windows Azure, you’ll need to use a more creative solution to warm up the service, or you could host in a worker role instead of IIS. I’ll try to post more on possible solutions sometime in the near future.

Building the Client

Building the client is much the same as for any other WCF application. I chose to use a ChannelFactory so I could reuse the contract assembly from the services, but any WCF proxy approach should work fine.

The one thing I needed to do differently was explicitly attach some metadata when publishing the event to the topic. This is necessary whenever you want to allow subscribers to filter messages based on this metadata, as is done by Service Two in my sample. As you can see from the code, this is achieved by attaching a BrokeredMessageProperty object to the OperationContext.Current.OutgoingMessageProperties:

 var factory = new ChannelFactory<IAccountEventNotification>("Subscribers");
var clientChannel = factory.CreateChannel();
((IChannel)clientChannel).Open();

using (new OperationContextScope((IContextChannel)clientChannel))
{
    // Attach metadata for subscriptions
    var bmp = new BrokeredMessageProperty();
    bmp.Properties["AccountId"] = accountEventLog.AccountId;
    bmp.Properties["EventType"] = accountEventLog.EventType;
    bmp.Properties["Date"] = accountEventLog.Date;
    OperationContext.Current.OutgoingMessageProperties.Add(
        BrokeredMessageProperty.Name, bmp);

    clientChannel.OnEventOccurred(accountEventLog);
}
                
// Close sender
((IChannel)clientChannel).Close();
factory.Close();

Again, the interesting part is the configuration, although it matches the service pretty closely. In the case of the client, you only need to specify the topic’s URI in the endpoint definition since it knows nothing about the subscribers:

   <system.serviceModel>
    <extensions>
      <bindingElementExtensions>
        <add name="netMessagingTransport" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingElementExtensions>
      <bindingExtensions>
        <add name="netMessagingBinding" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </bindingExtensions>
      <behaviorExtensions>
        <add name="transportClientEndpointBehavior" type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
      </behaviorExtensions>
    </extensions>
    <behaviors>
      <endpointBehaviors>
        <behavior name="securityBehavior">
          <transportClientEndpointBehavior>
            <tokenProvider>
              <sharedSecret issuerName="owner" issuerSecret="{your key here}" />
            </tokenProvider>
          </transportClientEndpointBehavior>
        </behavior>
      </endpointBehaviors>
    </behaviors>
    <bindings>
      <netMessagingBinding>
        <binding name="messagingBinding" sendTimeout="00:03:00" receiveTimeout="00:03:00" 
                 openTimeout="00:03:00" closeTimeout="00:03:00" sessionIdleTimeout="00:01:00" 
                 prefetchCount="-1">
          <transportSettings batchFlushInterval="00:00:01" />
        </binding>
      </netMessagingBinding>
    </bindings>
    <client>
      <endpoint name="Subscribers" 
                address="sb://{your namespace here}.servicebus.windows.net/topic1" 
                binding="netMessagingBinding" 
                bindingConfiguration="messagingBinding" 
                contract="ServiceBusPubSub.Contracts.IAccountEventNotification" 
                behaviorConfiguration="securityBehavior" />
    </client>
  </system.serviceModel>

Summary

Windows Azure AppFabric Service Bus makes it easy to build loosely-coupled, event-driven systems using the Topics and Subscriptions capability. While you can program directly against the Service Bus API, using the WCF integration via the NetMessagingBinding allows you to use your existing skills (and possibly leverage existing code) to take advantage of this great capability.

ServiceBusPubSub.zip