Photo Mosaics Part 6: Roles

The last three posts of this series went into some detail on the various storage aspects of the Azure Photo Mosaics application, and now it’s time to examine the processing that is carried out by the three roles (one web and two worker roles).  With the foundation we’ve laid regarding the storage infrastructure, it turns out that the role processing code is quite straightforward.   Let’s take a look, and start by revisiting the architecture diagram I’ve been using throughout this series.

Photo Mosaics architecture highlighting the web and worker roles

As you know from the last post, the primary drivers of the processing are the queues which convey messages among the various roles.  Each role then has a fairly rote pattern: accept a message, perform the specific task it’s focused on, and queue up a new message for the next step in the workflow.  To summarize, the table below highlights the interaction and jobs carried out by each role. 

Role Input Output Tasks Performed
ClientInterface job info via WCF Web Service imagerequest queue jobs table  imageinput container  Service Bus
  1. accept request
  2. record request in jobs table
  3. store original image in imageinput container
  4. queue request for JobController
  5. confirm request with client via Service Bus

JobController

imagerequest queue imageinput container slicerequest queue sliceinput container
  1. divide original image into multiple slices
  2. store each slice independently in sliceinput container
  3. submit multiple messages to slicerequest queue corresponding to each slice

ImageProcessor

slicerequest queue sliceinput container image library sliceresponse queue sliceoutput container
  1. ‘mosaicize’ slice of original image
  2. store processed slice in blob storage
  3. queue message indicating slice has been processed
JobController (part 2) sliceresponse queue sliceoutput container imageresponse message imageoutput container
  1. determine if all slices of image processed
  2. if so,
    1. assemble complete processed image
    2. queue job completion message
  3. if not, do nothing
JobController (part 3) imageresponse message imageoutput container jobs table Service Bus
  1. acknowledge job completion by
    1. updating job record, and
    2. notifying client via Service Bus
  2. this also is an extension point for potentially additional steps in the workflow, e.g., produce alternative image encodings or sizes

Some key aspects to note here:

  1. Each step is stateless: all of the information needed to process a given step is included in the queue message or in persistent storage (Windows Azure blobs or tables).

  2. Each step is essentially idempotent .   Recall that queues provide guaranteed delivery, and so there is the possibility that a message will get processed multiple times (e.g., a role processing a message is recycled due to a bug or maintenance action, and the message reappears on the queue after the invisibility timeout interval).  For this application, that would mean that an image (or image slice) gets processed multiple times.  The result is extra unnecessary work, but the output (a portion of the manipulated image) would just overwrite itself in the corresponding blob container.

    Note though that the initial job request from the client could be lost in transmission to the ClientInterface role, and there are no retries attempted given the BasicHttpBinding currently used.  Of course, WCF offers additional bindings to support reliable sessions, so the implementation could be upgraded to support that.

  3. The roles are independently scalable.  The ImageProcessing role is the most computationally intensive and the primary candidate for scaling out the application.  That said, if there are a lot of client submissions, additional instances of just the ClientInterface and/or JobController roles could be spun up to handle the load (and then dropped when/if the spike in usage subsides).

Ok, time for some code!  As you know, all the source code for the project is available for download, so I won’t replicate it in its entirety here, but I did want to show how the workflow starts with the ClientInterface role’s WCF service as well as provide a representative snippet from one of the worker role implementations to show the queue integration.

Web Role (ClientInterface)

The conversion of an image begins with a request from the client (the Windows Forms application in this case) to a WCF web service named JobBroker that is hosted in the ClientInterface web role.  There’s a single method on that service’s interface, SubmitImageForProcessing that is shown below.

  2: {
  3:     Uri imageUri = null;
  4:     BlobAccessor blobAccessor = null;
  5:     TableAccessor tableAccessor = null;
  6:  
  7:     try
  8:     {
  9:         String connString = new StorageBrokerInternal().                                               GetStorageConnectionStringForClient(clientId); 
  10:         tableAccessor = new TableAccessor(connString);
  11:         blobAccessor = new BlobAccessor(connString);
  12:  
  14:         var job = tableAccessor.CreateJob(clientId,
  15:                                       requestId,
  16:                                       originalLocation,
  17:                                       imageUri,
  18:                                       Utilities.ImageUtilities.GetImageSize(imgBytes),
  19:                                       tileSize,
  20:                                       slices);
  21:  
  22:         if (QueueAccessor.IsAvailable)
  23:         {
  24:             QueueAccessor.ImageRequestQueue.SubmitMessage<ImageRequestMessage>
  25:                  (clientId, requestId, imageUri, imageLibraryUri, tileSize, slices);
  26:  
  27:             tableAccessor.WriteStatusEntry(requestId,
  28:                 String.Format("New request from {0}: {1}", clientId, originalLocation));
  29:  
  30:             try
  31:             {
  32:                 Notifier.SendRequestStartedNotification(
  33:                     clientId,
  34:                     requestId,
  35:                     originalLocation,
  36:                     imageUri,
  37:                     tileSize,
  38:                     job.StartTime);
  39:             }
  40:             catch (Exception e)
  41:             {
  42:                 Trace.TraceWarning(                          "Request: {0}{4}Exception: {1}{4}Message: {2}{4}Trace: {3}",
  43:                     requestId,
  44:                     e.GetType(),
  45:                     e.Message,
  46:                     e.StackTrace,
  47:                     Environment.NewLine);
  48:             }
  49:         }
  50:         else
  51:         {
  52:             throw new SystemException(         "Application queueing service could not be initialized; request cannot be processed");
  53:         }
  54:     }
  55:     catch (Exception e)
  56:     {
  57:         // clean up if there was a problem
  58:         if (blobAccessor != null) blobAccessor.DeleteImage(imageUri);
  59:  
  60:         Trace.TraceError("Request: {0}{4}Exception: {1}{4}Message: {2}{4}Trace: {3}",
  61:                                     requestId,
  62:                                     e.GetType(),
  63:                                     e.Message,
  64:                                     e.StackTrace,
  65:                                     Environment.NewLine);
  66:  
  67:         // surface exception to client
  68:         throw;
  69:     }
  70: }

Let’s break this down a bit:

Line 1: As you can see, all of the information required for the request is passed in via parameters to the service call – including the byte array for the original image.  To support the image transfer, you’ll note that the web.config contains the following binding options:

 <binding name="JobBrokerBinding" 
         maxReceivedMessageSize="10485760" 
         messageEncoding="Mtom">
  <readerQuotas maxArrayLength="10485760"/>
</binding>

Lines 9-11: Here, the BlobAccessor and TableAccessor are instantiated to provide a point of access to blob and table storage within Windows Azure.  I covered these classes as part of the blog post on the Azure Image Processor storage architecture earlier in this series.

Line 13: The original image is written to blob storage, specifically the imageinput container, and named according to the unique RequestId (a GUID).

Line 14: Information for this request is logged in the jobs table in Windows Azure storage.

Line 22: A test is made to ensure the queuing infrastructure is available. Specifically this means that the storage account that is enveloped by the QueueAccessor static class is accessible.  If not, there’s not much more that can happen, so we bail pretty quickly by throwing an exception in line 52, which will eventually bubble up to the client.

Line 24: Presuming all is well with the queues, a new message is submitted to the ImageRequestQueue (you may want to refer to the post on queuing to see how access to the queues has been abstracted via the generic method SubmitMessage).

Line 25: A message is written to the status table just to indicate the current stage of processing.

Line 30-48: A call is made to the Windows Azure AppFabric Service Bus to inform the client that the request has been received and forwarded for processing.   I’m reserving another post to talk about the service bus usage, so we’ll dig further into that later in this series.

Line 55: Some rudimentary exception handling is included.  It’s not a best practice to catch the base Exception class alone, but to keep focus on the Windows Azure aspects of this application, I left it at that level.  You’d likely want to include specific exception handling cases for the more common things that can go wrong – such as a StorageClientException.

Points of Failure

When building cloud applications, a primary mantra is to expect failure – that’s why we have constructs like queues with reliable messaging.  So lets see what happens in a few scenarios with the code above if there’s say a hardware failure the middle of the processing.   How resilient is the implementation?

1.  We can’t initialize blob or table storage (Lines 9-11).

We’ll get to the exception handling code in Line 55.  There’s an attempt to delete the image from the imageinput container, but in this scenario the image wouldn’t exist yet, so it’s a no-op.  We also push a message to Windows Azure diagnostics via the familiar Trace class (we’ll look at Windows Azure diagnostics in an upcoming post).

2.  There’s a failure writing to blob or table storage (Lines 13 and 14).

We’ll hit the exception handling code in Lines 55ff.  There’s a chance the image bytes have already been transferred to blob storage, so we’ll make an attempt to clean that up, but if that doesn’t happen it’s not a huge deal. There’s certainly an opportunity to have some ‘orphaned’ images in blob storage, which does cost money for storage (15 cents per GB per month).

For a robust production app, you might want to have a process to clean up the imageinput container, perhaps doing a purge every month or so. Alternatively, the application supports these blob containers being part of a Windows Storage account owned by the client of the application – versus by the application itself. In that case, it’s not your money that you’re spending, so perhaps it becomes a responsibility of the client of your cloud service?

Note that there is not a similar clean-up attempt on the jobs table.  That was actually a conscious decision.   The storage costs are trivial for the single entry in the table. Beyond that, it records there fact there was an aborted attempt, versus pretending the request was never submitted, and that may be valuable from a trouble-shooting or SLA-accounting perspective.

3.  Queue initialization fails (Line 22).

If we can’t get to the queuing subsystem, there’s not a lot this application can do, so we throw out an exception in Line 52.  This will bubble up through the same exception handling code in Lines 55ff that I referenced earlier.

4.  Service Bus notification fails (Line 32).

As mentioned I’ll cover the details of the Service Bus functionality later, but if the invocation fails, the request is not aborted.  The failure will be logged (Line 42), but since the notification service isn’t really critical functionality the request will still be processed.  

Note though that this could result in a usability issue.  In the Windows Client application, the Service Bus message is used to let the end-user know that the process has been kicked off.  If that message does not show up, the user may assume something didn’t work (they will see ‘request submitted…’ in the UI) and resubmit, kicking of a duplicate request with a new ID.  In actuality, if the request had actually failed they would see an exception message, but that’s a fairly subtle distinction, not the mark of a good user interface  A bit of extra work on the UI end could prevent this ambiguity – in fact, only if the request is submitted successfully will it appear in the Job List window of the client, so the UI could steer the user to that aspect of the interface as the ultimate confirmation.

Worker Role (ImageProcessor)

Now that the request is in the system, let’s take a look at one phase of the processing.  I’ll cover the ImageProcessor job role’s Run method here, but you’ll note at the end of the discussion that all of the message processing in the workflow conforms to a very similar pattern.

  1: Public Overrides Sub Run()
  2:  
  3:     While (True)
  4:  
  5:         ' NEW SLICE REQUEST
  6:         Dim requestMsg As SliceRequestMessage =
  7:             If(QueueAccessor.IsAvailable, QueueAccessor.SliceRequestQueue.AcceptMessage(Of SliceRequestMessage)(), Nothing)
  8:         If requestMsg IsNot Nothing Then
  9:  
  10:             ' check for possible poison message
  11:             If requestMsg.RawMessage.DequeueCount <= requestMsg.Queue.PoisonThreshold Then
  12:  
  13:                 Try
  14:  
  15:                     ' instantiate storage accessors
  16:                     Dim imgLibraryAccessor As BlobAccessor = New BlobAccessor(StorageBroker.GetStorageConnectionStringForAccount(requestMsg.ImageLibraryUri))
  17:                     Dim blobAccessor As BlobAccessor = New BlobAccessor(StorageBroker.GetStorageConnectionStringForAccount(requestMsg.SliceUri))
  18:                     Dim tableAccessor As TableAccessor = New TableAccessor(StorageBroker.GetStorageConnectionStringForClient(requestMsg.ClientId))
  19:  
  20:                     ' use caching? (this is a role property for experimentation, in production you'd likely commit to caching)
  21:                     Dim useCaching As Boolean
  22:                     Try
  23:                         useCaching = CBool(RoleEnvironment.GetConfigurationSettingValue("EnableAppFabricCache"))
  24:                     Catch e As Exception
  25:                         useCaching = False
  26:                     End Try
  27:  
  28:                     ' time the operation
  29:                     Dim sw As New Stopwatch()
  30:                     sw.Start()
  31:  
  32:                     ' build the library
  33:                     Dim library As New ImageLibrary(imgLibraryAccessor,
  34:                                                     requestMsg.ImageLibraryUri,
  35:                                                     requestMsg.TileSize,
  36:                                                     useCaching)
  37:  
  38:                     sw.Stop()
  39:                     tableAccessor.WriteStatusEntry(requestMsg.RequestId,
  40:                              String.Format("Slice {0}: library generation completed in {1}:{2:00}",
  41:                                            requestMsg.SliceSequence, sw.Elapsed.Minutes, sw.Elapsed.Seconds))
  42:                     sw.Start()
  43:  
  44:                     ' generate the image
  45:                     Dim generatedImage = library.Mosaicize(
  46:                         blobAccessor.RetrieveImage(requestMsg.SliceUri),
  47:                         requestMsg.TileSize,
  48:                         Sub(msg As String)
  49:                             tableAccessor.WriteStatusEntry(requestMsg.RequestId,
  50:                                         String.Format("Slice {0}: {1}", requestMsg.SliceSequence, msg))
  51:                         End Sub
  52:                     )
  53:  
  54:                     ' stop the timer
  55:                     sw.Stop()
  56:  
  57:                     tableAccessor.WriteStatusEntry(requestMsg.RequestId,
  58:                              String.Format("Slice {0}: slice generation completed in {1}:{2:00}",
  59:                                            requestMsg.SliceSequence, sw.Elapsed.Minutes, sw.Elapsed.Seconds))
  60:  
  61:                     ' store the processed segment
  62:                     Dim processedSliceUri = blobAccessor.StoreImage(generatedImage, blobAccessor.SliceOutputContainer,
  63:                                                                     String.Format("{0}_{1:000}", requestMsg.BlobName, requestMsg.SliceSequence))
  64:  
  65:                     ' signal completion of the slice
  66:                     QueueAccessor.SliceResponseQueue.SubmitMessage(Of SliceResponseMessage)(requestMsg.ClientId, requestMsg.RequestId, processedSliceUri)
  67:  
  68:                     ' message has been processed
  69:                     requestMsg.DeleteFromQueue()
  70:  
  71:                 Catch e As Exception
  72:  
  73:                     Dim tableAccessor As TableAccessor = New TableAccessor(StorageBroker.GetStorageConnectionStringForClient(requestMsg.ClientId))
  74:                     tableAccessor.WriteStatusEntry(requestMsg.RequestId,
  75:                         String.Format("Slice {0}: An unrecoverable exception has occurred, consult diagnostic logs for details.",
  76:                                       requestMsg.SliceSequence))
  77:  
  78:                     ' something really bad happened!
  79:                     Trace.TraceError("Request: {0}{4}Exception: {1}{4}Message: {2}{4}Trace: {3}",
  80:                         requestMsg.RequestId,
  81:                         e.GetType(),
  82:                         e.Message,
  83:                         e.StackTrace,
  84:                         Environment.NewLine)
  85:                 End Try
  86:             Else
  87:  
  88:                     ' remove poison message from the queue
  89:                     requestMsg.DeleteFromQueue()
  90:                     Trace.TraceWarning("Request: {0}{4}Exception: {1}{4}Message: {2}{4}Trace: {3}",
  91:                             requestMsg.RequestId,
  92:                             "N/A",
  93:                             String.Format("Possible poison message [id: {0}] removed from queue '{1}' after {2} attempts",
  94:                                 requestMsg.RawMessage.Id,
  95:                                 QueueAccessor.SliceRequestQueue,
  96:                                 QueueAccessor.SliceRequestQueue.PoisonThreshold),
  97:                             "N/A",
  98:                             Environment.NewLine)
  99:             End If
  100:         Else
  101:             Thread.Sleep(10000)
  102:         End If
  103:     End While
  104: End Sub

Here’s a breakdown of what the code above does:

Lines 6-7:   The SliceRequestQueue is polled to see if there are any messages (of message type SliceRequestMessage) pending.  If there are not, there’s nothing really to do and control drops way down to Line 101, where the thread sleeps for 10 seconds before looping back around to check again if a message has arrived.

Consider a more sophisticated polling strategy.   In Windows Azure, you are charged a fee for every storage transaction (priced as of this writing at 1 cent per 10,000 transactions).   Every time the queue is polled, you have executed one storage transaction – even if there’s nothing in the queue!  If your application isn’t consistently busy, you’re accruing charges that aren’t really producing any work.  A common implementation involves using an exponential backoff algorithm.  Essentially, you extend the time between polls whenever the queue shows empty.   So, the first time you find the queue empty, perhaps you have the code wait 10 seconds before checking again.  If the queue is empty after 10 seconds, you check again after 20 seconds, then 40, and so on.  Once a message shows up and you process it, you set the delay back to 10 seconds.  Of course, you have to cap this as well, because you don’t want the first request after a lull to end up waiting days to be processed because of the exponential growth of your delay factor!

Line 11: At this point we have a message in hand, so the first test is to check if we’ve seen this exact message before, and specifically how many times we’ve seen it.  As you’ll recall, a message can reappear on the queue if it has not been explicitly deleted by the role that dequeued it, so if something bad happens while it’s being processed, the application can get another shot at it.  That works well if there’s just a glitch – for instance, the VM for that role just happens to be brought down to apply a patch right in the space of time between when the role retrieved the message and when it has completed work and goes to delete it.  But what if there’s something inherently wrong with the message – say there’s corrupt data in the message that causes the role processing it to crash every time?  If that happens, the message will just keep reappearing to be doomed once again – an Azure manifestation of Groundhog Day!

To accommodate this ‘poisoned message’ scenario, you can establish a threshold of how many times you’ll dequeue the same message before abandoning all hope and deleting it.  In the case of the Azure Photo Mosaic application, I made this threshold value a metadata property of the queue itself versus hardcoding it (cf. this series’ post on Azure queues).  What you see in Line 11 is the logic to check the message’s DequeueCount property against this threshold value.  If that value is exceeded, the message is permanently deleted (Line 89) and a message logged via Windows Azure diagnostics.

Lines 16-26: Here we’re initializing the storage classes implemented in the CloudDAL layer.  There’s also a flag set to check if AppFabric Caching should be used to store the tile images from which the mosaic will be constructed.  By default, that configuration parameter is set to false.  We’ll chat about caching in a later post, so you can just skim over this for now.

Line 33: This is a call to a method that creates an in-memory representation of an image library, a collection of images that are used as the tiles (tesserae) to create the mosaic and which are stored together in a given container Windows Azure blob storage.  You’ll note this call is enclosed by some Stopwatch code. That’s because it’s a fairly compute-intensive operation and I wanted to get a feel for just how long it takes to process an image.

When this code has completed, the ImageLibrary instance (library) will point to a collection of byte arrays, each element of which contains an image from the library resized to the tessera size specified as part of the original request.   Additionally, after each image is resized, an average color value for that image is calculated and stored with the raw bytes.

Line 45:   The Mosaicize method is the heavy lifter for the entire application.  For each pixel of the incoming image slice, this method takes the color value of that pixel, finds the image in the in-memory ImageLibrary that has an average color value closest to it, and selects that image to be the tile for that pixel.   Truth be told, there’s a bit of a tolerance factor built-in here, and it will actually pick randomly from all tile images with an average color value within 15% of the color value of the original pixel; this is just to provide a bit of variation in the output image.

As the method processes pixel by pixel, it also builds the output image in memory, and when processing is complete, the in-memory bytes for that image are returned as the return value of the Mosaicize function.

Line 62: With the processing complete, the raw bytes returned by Mosaicize are stored in blob storage (the sliceoutput container), and….

Line 66: a new queue message is placed on the SliceResponse queue indicating that this particular section of the mosaic has been completed.  The message includes the URL of the blob in sliceoutput that holds that processed image.  The JobController role will be polling this queue and act on the message when it appears furthering the workflow along.

Line 69: Now that everything has been completed for this task in the application workflow, it’s safe to explicitly delete the message that spawned the processing in the first place.  If you were to omit this step, the message invisibility timeout would eventually expire, and the message would show back up on the queue only to be processed once again!

If you gloss over the specific processing goals of this role, you’ll see that a pattern emerges – one that’s implemented by all of the roles in the Azure Photo Mosaics application. The pseudo-code below captures that pattern:

 get next message from queue
if there’s really a message
   might it be a poisoned message?
   if not, carry out whatever processing is appropriate for that message
   delete the message
else
   check again after some interval of time
end

Points of Failure

As we did with the ClientInterface worker role above, let’s look at a few things that could go ‘wrong,’ and see how the system would behave.

1.  Line 6 and 7 return a message of type SliceRequestMessage.  What happens if the message on that queue doesn’t conform to that specific type?

In the storage scheme I set up, each queue has one specific message type that it handles, so if the code is correct, this condition should never happen.  If for some reason though a non-parseable message did find its way into a queue, the AcceptMessage method would detect it, log an exception, and return null indicating there’s no message to process (see the previous post in this series for a code-level examination of this). 

In my implementation, the message is not deleted though and therefore would be considered a poisoned message!  Of course, we just reviewed the code to handle poisoned messages, so we know there will not be any long-lasting negative effects on the system.  Anyone looking at the diagnostic logs would just see repeated failed attempts to process the given message for a number of times (until the poisoned message threshold was reached).

Why not delete the message the first time, since we know it’s poisoned at that point?   Short answer, I should have.  Longer answer, it would have required a bit of code rewrite. 

Right now, AcceptMessage returns either a legitimate message or null, the latter which could mean there’s no message at all OR there was a message but it’s not the right type.   Because AcceptMessage returns a strongly typed message, it can’t hold an invalid message in order to pass it into the role processing the message.  So the role code itself can’t delete that message.

So why not delete the message inside of AcceptMessage, once the parsing attempt fails?  That’s indeed possible, but it causes a bit of cruft to be pulled into that method.  Unfortunately, a CloudQueueMessage itself does not expose a pointer to the queue from which it originated, and the delete operation is performed on a queue, with the message passed as a parameter.  So to delete the message within AcceptMessage would mean including logic (likely involving the QueueAccessor class) to determine the originating queue. 

It’s a solvable problem, just not something I’ve taken the time to address in a more elegant fashion.

2.  What happens if there’s some exception that occurs once a message has been pulled off the queue (after it’s passed the poisoned-message test)?

By now, you probably should know the answer.  Exception handling code beginning at Line 73 will kick in, recording a message to Windows Azure diagnostics as well as to the status table.  Notably, the message itself is not deleted, so there will be another chance to process it once the invisibility timeout interval expires.

3.  What happens if the poisoned-message (Line 11) test yields a false positive?

There’s the potential, depending on the invisibility timeout value provided when dequeuing a message, that there’s just not enough time to finish the message before it pops back on the queue to be processed again by another role instance.   If that occurs, the message is not really poisoned, but it will be treated as such.   The end result is that the specific task will have been performed multiple times (per the poison threshold value assigned to the particular queue in the Azure Photo Mosaics application). 

That shouldn’t be a problem if you adhere to the rules and make the work performed idempotent.  The administrator of the application will, however, see messages in the logs and status table indicating the message was poisoned, when it really wasn’t.  The end-user, of course, goes merrily along not realizing any of this has occurred, short of perhaps noticing it took a bit longer than usual to finish the job.

4.  What happens if the message is indeed poisoned, and the work does not get done in the allotted amount of time?

The answer to this depends on what specific task is being executed.  For instance, if this were to have occurred in the ImageProcessor role code above, the result would be that there is no processed slice for the given input.  If the original image were divided into say five slices, it could mean that four of the slices completed, and the fifth didn’t – and never will!  In that case, the job will never successfully complete, and you need to have some remediation strategy in place. 

There will be indications of a problem – in the diagnostics logs specifically – and you could have some automated process in place to monitor those logs and inform a human that there’s a problem.  Or you could even extend the application to make it ‘self-healing.’  With the code above, you know when the poisoned message has been detected and (false positives notwithstanding) could add additional functionality to clean up that job’s remnants (including associated queue messages and blobs) as well as notify the end-user there was a problem.  That’s a fair bit of additional code that’s tangential to what I wanted to cover in this series, but it’s something to consider thoughtfully when creating a resilient and user-friendly application.

With this post, we’ve reached a bit of a milestone in the series.  The complete architecture – storage and processing – has now been laid out, and we can focus on more specific aspects of the application next, like the use of the Service Bus, AppFabric caching, diagnostics, etc.   Look for some additional posts forthcoming; although, there may be an upcoming lull given a few weeks of travel on my docket.