Share via


5 – Executing Background Tasks

patterns & practices Developer Center

On this page: Download:
The Premise | Goals and Requirements | Overview of the Solution | Storing Receipt Images | Background Processing - Detecting an New Uploaded Image, Using Azure Storage Queues, Handling Transient Faults when Accessing Azure Storage, Controlling Access to Receipt Images | Inside the Implementation | Uploading and Saving Images | Abstracting the Worker Role - User Code in the aExpense Application, The Plumbing Code Classes | Processing the Images | Making the Images Available Using Shared Access Signatures | More Information Download code samples
Download PDF

This chapter walks you through the changes in the cloud-based version of the aExpense application that Adatum made when they added support for uploading, storing, and displaying scanned images of receipts. You'll see how the application uses Microsoft Azure blob storage to store the image data, how the application uses a worker role in Azure to perform background processing tasks on the images, and how the application uses shared access signatures to control access to the images by users. The chapter also introduces a simple set of abstractions that wrap a worker role, in the expectation that the aExpense application will need to perform additional background tasks in the future.

The Premise

During this phase of the project, the team at Adatum turned their attention to the requirement for a background process in the aExpense application that performs some processing on the scanned images of business expense receipts that users upload.

The original on-premises web application enables users to upload images of their business expense receipts, and the application assigns a unique name to each image file before it saves the image to a file share. It then stores the path to the image in the SQL Server database that holds the business expenses data. The application can then retrieve the image related to an expense submission and display it through the user interface (UI) later.

The completed on-premises application also has a background process that processes the images, which is implemented as a Windows service. This process performs two tasks: it compresses the images to preserve disk space, and it generates a thumbnail image. By default, the application's UI displays the thumbnail, but if a user wants to see a more detailed image, it enables viewing the full-sized version of the image.

Goals and Requirements

Adatum’s developers must implement the receipt image upload function into the migrated version of the application running in Azure. Adatum has a number of goals for the implementation of the image processing component of the application.

Firstly, Adatum wants to minimize the storage requirements for the images while maintaining the legibility of the information on the receipts.

Storage for scanned receipt images will be one of the monthly costs for the application.

Adatum also wants to maintain the responsiveness of the application and minimize the bandwidth required by the UI. A user shouldn't have to wait after uploading an image while any necessary processing takes place, and the application should display image thumbnails with an option to display a full-sized version.

Finally, Adatum wants to maintain the privacy of its employees by making receipts visible only to the employee who submitted them and to the people responsible for approving the expense submission.

Ff803365.note(en-us,PandP.10).gifJana Says:
Jana
                We need to protect the privacy of the users of this application.</td>

Overview of the Solution

The team at Adatum made several significant changes to the implementation of the aExpense application for the Azure based version in this phase.

Storing Receipt Images

The first decision was to select a storage mechanism for the scanned receipt images. Azure storage provides the following core services for persistent and durable data storage in the cloud:

  • Azure blobs provide a series of containers aimed at storing text or binary data. Block blob containers are ideal for streaming data, while page blob containers can be used for random read/write operations.
  • Azure queues provide a mechanism for reliable, persistent messaging between role instances, such as between a web role and a worker role, and between roles and non-hosted applications and services.
  • Azure tables provide a non-relational table-structured storage mechanism. Tables are collections of entities that do not have an enforced schema, which means a single table can contain entities that have different sets of properties. This mechanism is primarily aimed at scenarios where large volumes of data must be stored, while being easy to access and update.
  • Azure drives provide a mechanism for applications to mount a single volume NTFS VHD as a page blob, and access it as though it were a local hard drive. They are aimed at scenarios where applications rely on access to a physical file system.

The Azure storage services support both a managed API and a REST API that can be used from within Azure-hosted or on-premises (remote) applications.

A simple approach to the application's storage requirements would be to use the Azure storage drive. This would require minimal changes to the code used in the on-premises version because the Azure drive is a simple NTFS volume that you can access by using the standard .NET I/O classes. The major drawback of this approach is that you can write to the Azure drive from only one role instance at a time. Adatum plans to deploy multiple instances of the aExpense web role to ensure high availability and scalability for the application.

While Azure tables could be used to store the binary data for an image, blob storage is better suited to this type of data. A single entity in table storage is limited to 1 MB in size, whereas a block blob could be up to 200 GB in size. Furthermore, it is easy to reference an image file stored in blob storage with an HTTP or HTTPS address. Therefore, the approach adopted by Adatum was to store the image data in Azure block blob storage. Although this approach requires more changes to the code compared to using an Azure storage drive, it is compatible with using multiple role instances.

Background Processing

The second decision was how to implement background processing for tasks in the aExpense application. Adatum could simply execute the background tasks asynchronously within the application’s web role. Carrying out tasks asynchronously in the background offloads work from the application's request handling code, which helps to improve the responsiveness of the UI.

However, this does not allow for individual scaling of the background processing code. If the background tasks require additional processing capacity Adatum must increase the number of deployed instances to accommodate this, even if additional UI processing capacity is not required.

The alternative is to create a separate worker role and execute all of the background tasks within this role. Unlike simply emulating a background role by using asynchronous code in the web role, using a worker role allows for individual scaling of the roles. The developers at Adatum realized that it would be better to implement the UI and the background processing as separate roles that could be scaled individually. However, they also realized that running separate worker role instances will increase the hosting costs of the application because each role instance is charged on an hourly basis.

A worker role is the natural way to implement a background process in Azure. You can scale each role type independently.

Therefore, Adatum decided to implement the image processing service by using a worker role in the cloud-based version of aExpense. Most of the code from the existing on-premises version of the application that compresses images and generates thumbnails was simply repackaged in the worker role. What did change was the way that the image processing service identified when it had a new image to process.

Ff803365.note(en-us,PandP.10).gifBharath Says:
Bharath
                Adatum should review the cost estimates for the aExpense application now that the application includes a worker role.</td>

Detecting an New Uploaded Image

Receipt images are uploaded to the aExpense application through the UI, which stores them ready for processing. In the original on-premises version, a Windows service uses the FileSystemWatcher class to generate an event whenever the application saves a new image to the file share. The aExpense application then invokes the image processing logic from the OnCreated event handler, as shown in Figure 1.

Figure 1 - On-premises image processing

Figure 1

On-premises image processing

For the cloud-based version of the application using blob storage, this approach won't work because there is no Azure equivalent of the FileSystemWatcher class for blob storage. Instead, as Figure 2 shows, Adatum decided to use an Azure queue.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus
                We can use one or more Azure storage queues to communicate between roles. In this example we use a single queue to communicate between the web and the worker role.</td>

Figure 2 - Cloud-based image processing

Figure 2

Cloud-based image processing

You might consider using Azure Service Bus queues instead of Azure storage queues. Service Bus queues have advantages such as automatic de-duplication and long polling timescales, and can provide a level of indirection as well as being accessible through firewalls and network address translation (NAT) routers. However, for simple communication between Cloud Service roles, Azure storage queues typically offer sufficient capabilities and may provide better scalability and performance due to their inherent simplicity when compared to Service Bus queues.

Whenever a user uploads a new image to aExpense as part of an expense submission, the application UI writes a message to the queue and saves the image to blob storage. The worker role will pick up messages from the queue, compress the image, and then generate a thumbnail. The worker role saves the new image and thumbnail in blob storage.

After the worker role completes the image processing, it updates the expense item entity in Azure table storage to include references to the image and the thumbnail, and then deletes the original image. When displaying the updated image in the UI, the application locates the images in blob storage using the information maintained in the expense item entity.

Note

It’s possible to implement background processes so that they run at a scheduled time, or so that they can run for a long time with the ability to restart part way through in the event of a failure, by using the features available in Azure storage queues. For more information see “Azure Queues Improved Leases, Progress Tracking, and Scheduling of Future Work.”

Using Azure Storage Queues

When using an Azure queue it is possible that a message could be read more than once. Different worker roles may try to read the same message, or a single worker role may attempt to read it again. For example, in the aExpense application a duplicate message would cause the worker role to try to resize the original image and to generate the thumbnail a second time, overwriting the saved compressed image and thumbnail. In addition to the unnecessary processing, this will fail if the previous process has already deleted the original uploaded image.

With Azure queues, it is possible that a message could be read twice by the same worker role, or by two different worker roles.

To prevent this, message processing tasks must be idempotent. Either the task execution must not in any way affect the operation of the application or the integrity of the application's data; or the code must prevent the task being executed more than once.

If your message processing method is not inherently idempotent, there are several strategies that you can adopt to stop the message recipient processing a message multiple times:

  • When you read a message from a queue, you can use the visibilityTimeout parameter to set how long the messages should be hidden from other readers (the default value is 30 seconds). This gives you time to make sure that you can process and delete the message from the queue before another client reads it. Getting a message from a queue does not automatically delete it from the queue. It is still possible for the visibilityTimeout period to expire before you delete the message; for example, if the method processing the message fails.
  • Each message has a DequeueCount property that records how many times the message has been retrieved from the queue. However, if you rely on this property, and only process messages that have a DequeueCount value of 0, your application must guard against the possibility that a message has been dequeued but not processed.
  • You could also add a unique transaction identifier to the message and then save the identifier in the blob's metadata properties. If, when you retrieve a message from a queue, the unique identifier in the message matches the unique identifier in the blob's metadata, you know that the message has already been processed once.

Handling Transient Faults when Accessing Azure Storage

Access to Window Azure storage is by using a URL that specifies the storage account and the path to the data items. This request will pass over the datacenter network if the storage account you use is in the same datacenter as the code making the request, which should typically be the case to achieve optimum performance. However, it is still possible that requests may fail to connect immediately due to transient network loads. This is especially the case if you need to access the storage account from outside that datacenter.

In Chapter 4, “Moving to Microsoft Azure SQL Database,” you saw how Adatum uses the Enterprise Library Transient Fault Handling Block to provide a reliable retry mechanism to connecting to a database. The application block can also be used to connect to Azure storage (blobs, queues, and tables). Adatum uses the application block in the UI part of the aExpense application to connect to the blob and queue when writing the uploaded image and posting a message to the worker role. It also uses the application block within the worker role for reading from and writing to blob storage.

The Azure storage client API includes support for custom retry policies. However, using the Transient Fault Handling Application Block enables you to take advantage of the customization offered by the block, use configuration to define your retry policies, and adopt a standard approach to all the retry logic in your application.

Controlling Access to Receipt Images

The application allows users to view only images of receipts that they previously uploaded, or images that belong to business expense submissions that they can approve. The application keeps other images hidden to protect users' privacy. Adatum evaluated a several approaches for achieving this.

Ff803365.note(en-us,PandP.10).gifJana Says:
Jana
                We evaluated three alternative approaches to making business expense receipt images browsable before deciding on shared access signatures.</td>

In Azure, all storage mechanisms can be configured to allow data to be read from anywhere by using anonymous requests, which makes the model shown in Figure 3 very easy to implement:

Figure 3 - Directly addressable storage

Figure 3

Directly addressable storage

In this scenario, you can access blob content directly through a URL such as https://<application>.blob.core.windows.net/<containername>/<blobname>. In the aExpense application, you could save the URLs as part of the expense entities in Azure table storage. The advantages of this approach are its simplicity, the fact that data is cacheable, that it offloads work from the web server, and that it could leverage the Content Delivery Network (CDN) infrastructure in Azure.

Ff803365.note(en-us,PandP.10).gifBharath Says:
Bharath
                CDN enables you to cache blob data at strategically placed locations for delivering the content to users with the maximum available bandwidth.</td>

However, the disadvantage of using directly addressable blob storage is the lack of any security. Therefore, this is not an appropriate approach for the aExpense application because it would make it easy for someone to guess the address of a stored image, although this approach would work well for data that you did want to make publicly available such as logos, branding, or downloadable brochures.

Note

Using deliberately obscure and complex URLs is a possible option, but this approach offers only weak protection and is not recommended.

The second approach considered by Adatum for accessing receipt images in blob storage was to route the request through the web site in much the same way that a “traditional” tiered application routes requests for data through the middle tier. Figure 4 shows this model.

Figure 4 - Routing image requests through the web server

Figure 4

Routing image requests through the web server

In this scenario there is no public access to the blob container, and the web application holds the access keys for blob storage. This is how aExpense writes the image data to blob storage. A possible implementation of this scenario would be to use an HTTP handler to intercept image requests, check the access rules, and return the data. Although this approach would enable Adatum to control access to the images, it would add to the complexity of the web application and increase the workload of the web server. In addition, you couldn't use this approach if you wanted to use the Azure CDN feature.

The approach that Adatum decided on for the aExpense application was to use the Azure Shared Access Signature (SAS) feature. SAS enables you to control access to individual blobs by generating blob access URLs that are valid for defined period of time (though the valid time period can be unlimited if required). In the aExpense application, the web role generates these special URLs and embeds them in the page served to users. These special URLs then allow direct access to the blob for a limited period of time.

Ff803365.note(en-us,PandP.10).gifJana Says:
Jana
                SAS works well from a security perspective; the URL is only valid for a limited period of time, and would be difficult to guess. However, beware of exposing the blob data over the CDN because it is cached there and so the SAS lifetimes may be extended due to this caching. </td>

There is some additional work for the web server because it must generate the SAS URLs, but Azure blob storage handles most of the work. The approach is reasonably secure because the SAS URLs, in addition to having a limited lifetime, also contain a uniquely generated signature, which makes it very difficult for anyone to guess a URL before it expires.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus
                In addition to using SAS tokens in URLs you can use server-stored access policies with SAS. These policies allow for immediate revocation of access permission. See “<a href="https://blogs.msdn.com/b/windowsazurestorage/archive/2012/06/12/introducing-table-sas-shared-access-signature-queue-sas-and-update-to-blob-sas.aspx">Introducing Table SAS (Shared Access Signature), Queue SAS and update to Blob SAS</a>.” for more information on using SAS with tables, queues, and blobs.</td>

Inside the Implementation

Now is a good time to walk through these changes in more detail. As you go through this section, you may want to download the Visual Studio solution from https://wag.codeplex.com/. This solution (in the Azure-WorkerRole folder) contains the implementation of aExpense after the changes in this phase are made. If you are not interested in the mechanics, you should skip to the next section.

The Hands-on Labs that accompany this guide provide a step-by-step walkthrough of parts of the implementation tasks Adatum carried out on the aExpense application at this stage of the migration process.

Uploading and Saving Images

In the aExpense application, the web role is responsible for uploading the image from the user's workstation and saving the initial, uncompressed version of the image. The following code in the SaveExpense method in the ExpenseRepository class (located in the DataAccessLayer folder of the aExpense.Shared project) calls the AddReceipt method of the ExpenseReceiptStorage class for each expense item that the user submits.

this.receiptStorage.AddReceipt(expenseItem.Id.ToString(), 
       expenseItem.Receipt, string.Empty);

The following code from the ExpenseReceiptStorage class shows how the AddReceipt method saves the original, uncompressed image to blob storage. Notice how it uses the Transient Fault Handling Block to retry all storage access operations.

public string AddReceipt(string receiptId, byte[] receipt, 
    string contentType)
{
  CloudBlob blob = this.storageRetryPolicy.ExecuteAction(
            () => this.container.GetBlobReference(receiptId));
  blob.Properties.ContentType = contentType;
  this.storageRetryPolicy.ExecuteAction(
           () => blob.UploadByteArray(receipt));  
  return blob.Uri.ToString();
}

Note

The retry policy used in the AddReceipt method is initialized in the class constructor by calling the GetDefaultAzureStorageRetryPolicy method of the Transient Fault Handling Block’s static RetryPolicyFactory class.

Abstracting the Worker Role

Figure 5 summarizes the common pattern for the interaction between web roles and worker roles in Azure.

Figure 5 - Web-to-worker role communication with an Azure queue

Figure 5

Web-to-worker role communication with an Azure queue

Ff803365.note(en-us,PandP.10).gifJana Says:
Jana Using an Azure storage queue is a very common pattern for communicating between the web role and the worker role.

In this pattern, to communicate with the worker role, a web role instance places messages on to a queue. A worker role instance polls the queue for new messages, retrieves them, and processes them. There are a couple of important things to know about the way the queue service works in Azure. First, you reference a queue by name, and multiple role instances can share a single queue. Second, there is no concept of a typed message; you construct a message from either a string or a byte array. An individual message can be no more than 64 kilobytes (KB) in size.

Note

If the size of your messages could be close to the maximum, be aware that Azure converts all messages to Base64 before it adds them to the queue.

In addition, Azure implements an “at-least-once” delivery mechanism; thus, it does not guarantee to deliver messages on a first-in, first-out basis, or to deliver only a single copy of a message, so your application should handle these possibilities.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus Azure does not guarantee to deliver messages on a first-in, first-out basis, or to deliver only a single copy of a message.

In the current phase of the migration of aExpense to Azure, the worker role only performs a single task. However, Adatum expects the worker role to take on additional responsibilities in later phases. Therefore, the team at Adatum developed some simple “plumbing” code to hide some of the complexities of Azure worker roles and Azure queues and to make them easier to work with in the future. Figure 6 is a high-level description of these abstractions and shows where to plug in your custom worker role functionality.

Figure 6 - Relationship of user code to plumbing code

Figure 6

Relationship of user code to plumbing code

The “user code” classes are the ones that you will implement for each worker role and job type. The “plumbing code” classes are the re-usable elements. The plumbing code classes are packaged in the AExpense.Jobs, AExpense.Queues, and AExpense.QueueMessages namespaces (the queue and message classes are in the aExpense.Shared project). The following sections first discuss the user code and then the plumbing code.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus For any new background task, you need to implement only the “user code” components.

User Code in the aExpense Application

The code you'll see described in this section implements the job that will compress images and generate thumbnails in the worker role for the aExpense application. What you should see in this section is how easy it is to define a new job type for the worker role to perform. This code uses the plumbing code that the following section describes in detail.

The following code in the WorkerRole class shows how the application initializes the worker role using the plumbing code. You create a new class that derives from the JobWorkerRole and override the CreateJobProcessors method. In this method, you instantiate your job processing objects that implement the IJobProcessor interface. As you can see, this approach makes it easy to plug in any additional job types that implement the IJobProcessor interface.

public class WorkerRole : JobWorkerRole
{
  protected override IEnumerable<IJobProcessor> 
       CreateJobProcessors()
  {
    return new IJobProcessor[] { new ReceiptThumbnailJob() };
  }
}

The constructor for the ReceiptThumbnailJob class specifies the interval the worker role uses to poll the queue. It instantiates an AzureQueueContext object, an ExpenseReceiptStorage object, and an ExpenseRepository object for the worker role to use, as shown here.

public ReceiptThumbnailJob()
    : base(2000, new AzureQueueContext())
{
  this.receiptStorage = new ExpenseReceiptStorage();
  this.expenseRepository = new ExpenseRepository();
}

The plumbing code passes a NewReceiptMessage object that contains the details of the image to process to the ProcessMessage method. This method then compresses the image referenced by the message and generates a thumbnail. The following code shows the ProcessMessage method in the ReceiptThumbnailJob class.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus Worker roles must poll the queue for new messages.
public override bool ProcessMessage(NewReceiptMessage message)
{
  …
}

In the aExpense application, to send a message containing details of a new receipt image to the worker role, the web role creates a NewReceiptMessage object and calls the AddMessage method of the AzureQueueContext class. The following code shows the definition of the NewReceiptMessage class.

[DataContract]
public class NewReceiptMessage : BaseQueueMessage
{
    [DataMember]
    public string ExpenseItemId { get; set; }
}

Note

It's important to use the DataContract and DataMember attributes in your message class because the AzureQueueContext class serializes message instances to the JSON format.

The last few lines of code in the SaveExpense method of the ExpenseRepository class show how the web role in aExpense posts a message onto the queue.

public void SaveExpense(Expense expense)
{
  using (var db = new ExpensesDataContext(
                      this.expenseDatabaseConnectionString))
  {
    var entity = expense.ToEntity();
    db.Expenses.InsertOnSubmit(entity);

    foreach (var detail in expense.Details)
    {
      var detailEntity = detail.ToEntity(expense);
      db.ExpenseDetails.InsertOnSubmit(detailEntity);
      if (detail.Receipt != null && detail.Receipt.Length > 0)
      {
        this.receiptStorage.AddReceipt(detail.Id.ToString(),
                              detail.Receipt, string.Empty);
      }
    }
    this.sqlCommandRetryPolicy.ExecuteAction(
            () => db.SubmitChanges());

    var queue = new AzureQueueContext(this.account);
    expense.Details.ToList().ForEach(i => queue.AddMessage(
            new NewReceiptMessage 
                   { ExpenseItemId = i.Id.ToString() })); }
}

Notice that the method does not place any messages on the queue to notify the worker role that new images are ready for processing until after all the records have been saved. This way, there is no chance that the worker role will try to process an image before the associated record has been saved and fail because it can't find the record (remember that the worker role must update the URLs of the image and thumbnail in the detail record).

The Plumbing Code Classes

Adatum developed the worker role abstractions to simplify the way that it sends messages from a web role to a worker role, and to simplify the way that it codes a worker role. The idea is that code can put a typed message onto a queue, and when the worker role retrieves the message, it routes it to the correct job processor for that message type. The previous section of this chapter described the job processor in the aExpense application that processes scanned receipt images using these abstractions. This section describes the abstractions in more detail.

The plumbing code classes simplify the way that you send messages from a web role to a worker role and the way that you implement a worker role.

The following core elements make up these plumbing code classes:

  • A wrapper for the standard Azure worker role's RoleEntryPoint class named JobWorkerRole that abstracts the worker role’s life cycle and threading behavior.
  • A customizable processor class named JobProcessor that enables users of the plumbing code classes to define their own job types for the worker role.
  • A wrapper for the standard Azure CloudQueue class named AzureQueueContext that implements typed messages to enable message routing within the JobWorkerRole class.
Ff803365.note(en-us,PandP.10).gifJana Says:
Jana Adatum expects to implement additional background processes, so it makes sense to create this plumbing code.
  • Figure 7 summarizes how the plumbing code classes handle messages derived from the BaseQueueMessage class.

Figure 7 - Worker role plumbing code elements

Figure 7

Worker role plumbing code elements

The message types that the plumbing code classes handle (such as the NewReceiptMessage type in aExpense) are derived from the BaseQueueMessage class shown in the following code example.

[DataContract]
public abstract class BaseQueueMessage
{
  private object context;

  public object GetContext()
  {
    return this.context;
  }

  public void SetContext(object value)
  {
    this.context = value;
  }
}

The plumbing code classes use the AzureQueueContext class to deliver messages to the worker role. The AzureQueueContext class creates named queues based on the message types, one queue for each message type that was registered with it. The following code shows the AddMessage method in the AzureQueueContext class that you use to add a new message to a queue, and the ResolveQueueName method that figures out the name of the queue to use.

public void AddMessage(BaseQueueMessage message)
{
  var queueName = ResolveQueueName(message.GetType());

  var json = Serialize(message.GetType(), message);
  var cloudQueue = this.storageRetryPolicy.ExecuteAction(
     () => this.queue.GetQueueReference(queueName));
  this.storageRetryPolicy.ExecuteAction(
     () => cloudQueue.AddMessage(new CloudQueueMessage(json)));
}

public static string ResolveQueueName(MemberInfo messageType)
{
  return messageType.Name.ToLowerInvariant();
}

Notice that the plumbing code serializes messages to the JSON format, which typically produces smaller message sizes than an XML encoding (but possibly larger than a binary encoding).

The AzureQueueContext class uses an Azure queue, and it’s important that this queue has been created before performing any operations against it. The public Purge method of the AzureQueueContext class calls a private method named EnsureQueueExists that itself calls the CreateIfNotExist method of the Azure CloudQueue class. Calling the CreateIfNotExist method counts as a storage transaction and will add to your application's running costs. To minimize the number of calls made to the CreateIfNotExist method the AzureQueueContext class maintains a list of queue names that it has already checked, as shown here.

private readonly ICollection<string> ensuredQueues; 
…
private void EnsureQueueExists(string queueName)
{
  if (!this.ensuredQueues.Contains(queueName))
  {
    this.queue.GetQueueReference(queueName).CreateIfNotExist();
    this.ensuredQueues.Add(queueName);
  }
}       

In Chapter 7, “Moving to Microsoft Azure Table Storage,” the way that the application calls the CreateIfNotExist is revaluated following the results from performance testing.

Ff803365.note(en-us,PandP.10).gifPoe Says:
Poe If you are concerned about the running costs of the application, you should be aware of which calls in your code are chargeable! If you have high volumes of messages, you should check how frequently your application calls methods such as CreateIfNotExist.

The plumbing code classes deliver messages to job processor components, where a job processor handles a specific message type. The plumbing code classes include an interface named IJobProcessor that defines two void methods named Run and Stop for starting and stopping a processor. The abstract BaseJobProcessor and JobProcessor classes implement this interface. In the aExpense application, the ReceiptThumbnailJob class that you've already seen extends the BaseJobProcessor class.

The following code example shows how the JobProcessor class implements the IJobProcessor interface.

private bool keepRunning;

public void Stop()
{
  this.keepRunning = false;
}

public void Run()
{
  this.keepRunning = true;
  while (this.keepRunning)
  {
    Thread.Sleep(this.SleepInterval);
    this.RunCore();
  }
}

protected abstract void RunCore();

The following code example shows how the BaseJobProcessor class provides an implementation of the RunCore method.

protected bool RetrieveMultiple { get; set; }
protected int RetrieveMultipleMaxMessages { get; set; }

protected override void RunCore()
{
  if (this.RetrieveMultiple)
  {
    var messages = this.Queue.GetMultipleMessages<T>
                   (this.RetrieveMultipleMaxMessages);
    if (messages != null)
    {
      foreach (var message in messages)
      {
        this.ProcessMessageCore(message);
      }
    }
    else
    {
      this.OnEmptyQueue();
    }
  }
  else
  {
    var message = this.Queue.GetMessage<T>();
    if (message != null)
    {
      this.ProcessMessageCore(message);
    }
    else
    {
      this.OnEmptyQueue();
    }
  }
}
Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus We don’t use the Transient Fault Handling Block when reading messages from a queue because the worker role regularly polls the queue to receive messages, and so a failure during one polling cycle will leave the message in the queue ready to be received in the next one.

As you can see, the RunCore method can retrieve multiple messages from the queue in one go. The advantage of this approach is that one call to the GetMessages method of the Azure CloudQueue class only counts as a single storage transaction, regardless of the number of messages it retrieves. The code example also shows how the BaseJobProcessor class calls the generic GetMessage and GetMultipleMessages methods of the AzureQueueContext class, specifying the message type by using a generic type parameter.

Ff803365.note(en-us,PandP.10).gifBharath Says:
Bharath It's cheaper and more efficient to retrieve multiple messages in one go if you can. However, these benefits must be balanced against the fact that it will take longer to process multiple messages; this risks the messages becoming visible to other queue readers before you delete them.

The following code example shows how the BaseJobProcessor constructor assigns the job's polling interval and the AzureQueueContext reference.

protected BaseJobProcessor(int sleepInterval,
          IQueueContext queue) : base(sleepInterval)
{
  if (queue == null)
  {
    throw new ArgumentNullException("queue");
  }
  this.Queue = queue;
}

The remaining significant methods in the BaseJobProcessor class are the ProcessMessageCore and the abstract ProcessMessage methods shown below.

protected int MessagesProcessed { get; set; }

private void ProcessMessageCore(T message)
{
  var processed = this.ProcessMessage(message);
  if (processed)
  {
    this.Queue.DeleteMessage(message);
    this.MessagesProcessed++;
  }
}

public abstract bool ProcessMessage(T message);

The RunCore method in the BaseJobProcessor class invokes the ProcessMessageCore method shown above when it finds new messages to process. The ProcessMessageCore method then calls the “user-supplied” implementation of the ProcessMessage method before it deletes the message from the queue. In the aExpense application, this implementation is in the ReceiptThumbnailJob class.

The final component of the plumbing code is the abstract JobWorkerRole class that wraps the standard Azure RoleEntryPoint class for the worker role. The following code example shows the Run method in this class.

protected IEnumerable<IJobProcessor> Processors { get; set; }

protected abstract IEnumerable<IJobProcessor>
                              CreateJobProcessors();

public override void Run()
{
  this.keepRunning = true;
  this.Processors = this.CreateJobProcessors();
  this.Tasks = new List<Task>();

  foreach (var processor in this.Processors)
  {
    var t = Task.Factory.StartNew(processor.Run);
    this.Tasks.Add(t);
  }

  // Control and restart a faulted job
  while (this.keepRunning)
  {
    for (int i = 0; i < this.Tasks.Count; i++)
    {
      var task = this.Tasks[i];
      if (task.IsFaulted)
      {
        // Observe unhandled exception
        if (task.Exception != null)
        {
          Trace.TraceError("Job threw an exception: " 
                + task.Exception.InnerException.Message);
        }
        else
        {
          Trace.TraceError("Job Failed no exception thrown.");
        }
        var jobToRestart = this.Processors.ElementAt(i);
        this.Tasks[i] = Task.Factory.StartNew(jobToRestart.Run);
      }
    }
    Thread.Sleep(TimeSpan.FromSeconds(30));
  }
}

The Run method invokes the abstract CreateJobProcessors method that is implemented in user code. In the aExpense application, you can find the implementation of the CreateJobProcessors method in the WorkerRole class. The Run method then creates a new task for each job processor, monitors the state so that it can restart any that fail, and then waits for all the tasks to finish.

Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus You need to keep the tasks within the worker role alive!

Processing the Images

The following code example shows how the aExpense application implements the image processing functionality in the ProcessMessage method in the ReceiptThumbnailJob class.

public override bool ProcessMessage(NewReceiptMessage message)
{
  var expenseItemId = message.ExpenseItemId;
  var imageName = expenseItemId + ".jpg";

  byte[] originalReceipt 
          = this.receiptStorage.GetReceipt(expenseItemId);

  if (originalReceipt != null && originalReceipt.Length > 0)
  {
    var thumb = ResizeImage(originalReceipt, ThumbnailSize);
    var thumbUri = this.receiptStorage.AddReceipt(
                     Path.Combine("thumbnails", imageName), 
                                   thumb, "image/jpeg");

    var receipt = ResizeImage(originalReceipt, PhotoSize);
    var receiptUri = this.receiptStorage.AddReceipt(
                       imageName, receipt, "image/jpeg");

    this.expenseRepository.UpdateExpenseItemImages(
                      expenseItemId, receiptUri, thumbUri);
    this.receiptStorage.DeleteReceipt(expenseItemId);

    return true;
  }

  return false;
}

This method retrieves the image name from the message sent to the worker role and creates two new versions of the image: one a thumbnail, and one a fixed standard size. It then deletes the original image. The method can process images in any standard format, but it always saves images as JPEGs.

Ff803365.note(en-us,PandP.10).gifJana Says:
Jana Although we limit users to uploading images that are less than 1,024 KB in size, we decided not to store the original images in order to save space. We found that resizing to a standard size provided acceptable quality.

Note

The ProcessMessage method should be idempotent, so there are no unwanted side effects if a message is delivered multiple times. The ProcessMessage method should also contain some logic to handle “poison” messages that cannot be processed for any reason.

Making the Images Available Using Shared Access Signatures

To make images of receipts viewable in the UI, the team at Adatum used Shared Access Signatures (SAS) to generate short-lived, secure URLs to address the images in blob storage. This approach avoids having to give public access to the blob container, and minimizes the amount of work that the web server has to perform because the client can access the image directly from blob storage.

The aExpense application uses Shared Access Signatures (SAS) to provide limited access to blobs in private containers. As well as controlling access to blobs at container level you can configure SAS for individual blobs, queues, tables, and partition or row key ranges in a table.

The following code example shows how the application generates the SAS URLs in the GetExpenseByID method in the ExpenseRepository class by appending the SAS to the blob URL. The aExpense application uses an HTTPS endpoint, so the blob reference and signature elements of the blob's URL are protected by SSL from "man-in-the-middle" attacks.

CloudBlob receiptBlob = this.storageRetryPolicy.ExecuteAction(
  () => container.GetBlobReference(item.ReceiptUrl.ToString())); 
item.ReceiptUrl = new Uri(item.ReceiptUrl.AbsoluteUri +
    receiptBlob.GetSharedAccessSignature(policy));
Ff803365.note(en-us,PandP.10).gifMarkus Says:
Markus The Shared Access Signature is appended to the standard blob URL. Using SSL ensures that all URL data except for the hostname is encrypted.

The GetSharedAccessSignature method takes a SharedAccessPolicy object as a parameter. This policy specifies the access permissions and the lifetime of the generated URL. The following code shows the policy that the aExpense application uses to grant read permission for an image. The application generates a new SAS whenever a user tries to access an expense submission.

private readonly TimeSpan sharedSignatureValiditySpan;

var policy = new SharedAccessPolicy
{
  Permissions = SharedAccessPermissions.Read, 
  SharedAccessStartTime = DateTime.UtcNow.AddMinutes(-5),
  SharedAccessExpiryTime = DateTime.UtcNow + 
                           this.sharedSignatureValiditySpan
};

The value of the Timespan variable named sharedSignatureValiditySpan is set in the constructor of the the ExpenseRepository class to the same value as the default ASP.NET session timeout. Notice that the code specifies a value five minutes before the current time on the server for the SharedAccessStartTime property of the SharedAccessPolicy object. This is done to prevent any clock skew between the client and the server from preventing access if the user tries to access the blob immediately.

Note

The request will succeed as long as the Get request for a blob starts after the start time and before the expiry time, even if the response streaming continues past the expiry time. In the aExpense application, the user's browser is the client that accesses the data stored in the blob. However, if your client application is using the StorageClient library to read the blob and chooses to retry on a Get failure as a part of the logical Get operation, which is the default StorageClient library policy, any retry request made after the expiration time will fail. The same will apply if you are using the Transient Fault Handling Application Block to handle retries. If you decide to have a short validity period for the URL, make sure that you issue a single Get request for the entire blob and use a custom retry policy so that if you retry the request, you get a new SAS for the URL.

More Information

MSDN is a good starting point to learn about Azure Blob Storage. Start at “How to use the Azure Blob Storage Service in .NET.”

To find out more about controlling access to Azure storage, including shared access signatures, look at “Managing Access to Containers, Blobs, Tables, and Queues.”

You can find a summary of the Azure service architecture at “Overview of Creating a Hosted Service for Azure.”

Next Topic | Previous Topic | Home | Community