Pub/Sub with WCF (Part 1)

In yesterday’s post I have explored how to use Service Bus queues as a transport to communicate between a WCF client and a service. In today’s post I will show you WCF pub/sub messaging using the topics and subscriptions. A subscription behaves exactly like a queue for reads while a topic behaves exactly like a queue for writes. This metaphor maps nicely to WCF where our services would listen on different subscriptions while the clients would send messages to a single topic. Service Bus would automatically forward matching messages to correct services.

For this example, I have created a simple order service as shown below. The Write extension simply writes to the console using a certain color assigned to a particular host. I have used this to distinguish the service host receiving the messages.

public class Order
{
    public double Amount { get; set; }
    public string ShipCity { get; set; }
}

[ServiceContract]
public interface IOrderService
{
    [OperationContract(IsOneWay = true)]
    void Submit(Order order);
}

public class OrderService : IOrderService
{
    public void Submit(Order order)
    {
        var writer = OperationContext.Current.Host.Extensions.Find<Writer>();

        writer.WriteLine("Received order value = {0}, ShipCity = {1}", order.Amount, order.ShipCity);
    }
}

Next I use the Management API to create topics & related subscriptions.

var baseAddress = "sb://soundbyte.servicebus.appfabriclabs.com";
var credential = TransportClientCredentialBase.CreateSharedSecretCredential("owner", "zYDbQ2wM4343dbBukWJTF6Y=");
var namespaceClient = new ServiceBusNamespaceClient(baseAddress, credential);

try
{
    namespaceClient.DeleteTopic("orders");
}
catch { }

var topic= namespaceClient.CreateTopic("orders");
topic.AddSubscription("london", new SqlFilterExpression("ShipCity = 'london'"));
topic.AddSubscription("reading", new SqlFilterExpression("ShipCity = 'reading'"));

Both subscriptions has a filter applied to them and only the messages matching this filter would be delivered to the subscription. I then create two separate service hosts simulating separate services running in different cities. I assigned ‘Cyan’ color to the London host (simulated service hosted in London) and ‘Yellow’ to the Reading Host (Service hosted in Reading).

var serviceBusBinding = new ServiceBusMessagingBinding();
serviceBusBinding.MessagingFactorySettings.Credential = credential;

string topicAddress = baseAddress + "/orders";
string londonSubscription = baseAddress + "/orders/subscriptions/london";
string readingSubscription = baseAddress + "/orders/subscriptions/reading";

var hostLondon = new ServiceHost(typeof(OrderService));
hostLondon.Extensions.Add(new Writer(ConsoleColor.Cyan));
hostLondon.AddServiceEndpoint(typeof(IOrderService), serviceBusBinding, new Uri(topicAddress), new Uri(londonSubscription));
hostLondon.Open();

var hostReading = new ServiceHost(typeof(OrderService));
hostReading.Extensions.Add(new Writer(ConsoleColor.Yellow));
hostReading.AddServiceEndpoint(typeof(IOrderService), serviceBusBinding, new Uri(topicAddress), new Uri(readingSubscription));
hostReading.Open();

Next I have created a simple WCF client using the ChannelFactory API and used it to send messages to the topic. In the current CTP, you have to set the filter properties separately on the BrokeredMessage object, which is a native ServiceBus message object and is different from the WCF Message object.

ServiceBusMessagingBinding automatically converts the WCF message object (generated by the proxy object) to a BrokeredMessage object which is then sent to ServiceBus. On the receive side a similar conversion happens from BrokeredMessage to a WCF Message which is then on to WCF Service. To set BrokeredMessage specific properties, we need to create a BrokeredMessageProperty object, set the properties on it and add it to generated WCF messages. ServiceBusMessagingBinding looks for this property and copy all the properties to the BrokeredMessage object it creates. I’m doing exactly the same using the OperationContextScope below.

var cf = new ChannelFactory<IOrderService>(serviceBusBinding, topicAddress);
var proxy = cf.CreateChannel();

using (new OperationContextScope( (IContextChannel)  proxy))
{
    var bmp = new BrokeredMessageProperty();
    bmp.Properties["ShipCity"] = "london";
    OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, bmp);
    proxy.Submit(new Order {Amount = 200, ShipCity = "london"});

    bmp.Properties["ShipCity"] = "reading";
    proxy.Submit(new Order { Amount = 322, ShipCity = "london" });

}

As you can see from the following output that the ‘london’ message was received by London host (‘yellow’) while the ‘reading’ message was received by Reading (‘cyan’) service host.

image

Finally yes it looks bit ugly that I have to specify these promoted/filter properties separately Sad smilefrom their WCF contract values. Ideally ServiceBus APIs should have picked up the ShipCity property from my Order object.  Unfortunately in the current CTP you have to do this separately as I have shown you in the above snippet.  In the next post I’ll show how you can extend WCF so that it automatically picks these ‘filter properties’  from operations parameters or WCF DataContract(s).

Stay tuned…