WF - The Asynchronous Activity/Service Pattern

In my previous blog post (Creating an Asynchronous Workflow Activity), I explained why your custom activities should either be really fast or run asynchronously. But, I didn't give you a real world example of how to do this. In this post I provide an example of the pattern that my team uses when creating a custom activity.

Pattern:

1) Create a service that actually does the work

2) Create a custom activity that uses the service and has a Queue listener event handler to get a message back from the service when the work is done.

Instead of walking you through this one, I just want to present you with the code, commented as much as I could :)

     public class DoSomethingAsyncService : WorkflowRuntimeService
    {
        private Random m_random;

        public DoSomethingAsyncService()
        {
            m_random = new Random();
        }

        public void DoSomethingAsync(Guid instanceId, Guid queueId)
        {
            ThreadPool.QueueUserWorkItem(delegate(Object state)
                {
                    // Fake a call to a WebService (let's say it takes 10 seconds)
                    Console.WriteLine("Making webservice call.");
                    Thread.Sleep(10000);

                    // Create the QueueItem
                    QueueItem item = new QueueItem();
                    item.ReturnCode = m_random.Next(0, 10);
                    item.Message = String.Format("The return code is {0}.", item.ReturnCode);

                    try
                    {
                        // Now send this item to the appropriate queue
                        WorkflowInstance instance = Runtime.GetWorkflow(instanceId);
                        instance.EnqueueItem(queueId, item, null, null);
                    }
                    catch (InvalidOperationException)
                    {
                        // Catch any InvalidOperationExceptions that occur due to the 
                        // workflow having already completed, etc.
                    }
                });
        }
    }

    public class QueueItem
    {
        public int ReturnCode;
        public String Message;
    }
     public class DoSomethingAsyncActivity : Activity
    {
        private Guid m_queueId;

        protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            // Create my queue
            m_queueId = Guid.NewGuid();
            WorkflowQueuingService queuingService = (WorkflowQueuingService)executionContext.GetService(typeof(WorkflowQueuingService));
            WorkflowQueue queue = queuingService.CreateWorkflowQueue(m_queueId, true);

            // Hook the item available event. This event will be triggered when our
            // service enqueues an item.
            queue.QueueItemAvailable += new EventHandler<QueueEventArgs>(QueueItemAvailable);

            // Call the service to perform the async work.
            DoSomethingAsyncService service = (DoSomethingAsyncService)executionContext.GetService(typeof(DoSomethingAsyncService));
            service.DoSomethingAsync(WorkflowInstanceId, m_queueId);

            // Return Executing to let the workflow know that we are still working.
            return ActivityExecutionStatus.Executing;
        }

        private void QueueItemAvailable(object sender, QueueEventArgs e)
        {
            // The activity execution context is always passed in as the sender.
            ActivityExecutionContext context = sender as ActivityExecutionContext;

            // Checking for a null context just in case something really strange is happening.
            if (context != null)
            {
                // Get the queue service so we can get the Queue object.
                // Note: you cannot salt away the actual queue object, the Workflow runtime does not allow it.
                WorkflowQueuingService queuingService = context.GetService<WorkflowQueuingService>();

                // Make sure the queue exists before going any further.
                if (queuingService.Exists(m_queueId))
                {
                    WorkflowQueue queue = queuingService.GetWorkflowQueue(m_queueId);

                    // Dequeue the item that was queued by the service.
                    QueueItem item = queue.Dequeue() as QueueItem;
                    if (item != null)
                    {
                        // We only expect one thing to ever be put into our Queue;
                        // so, now we remove the queue.
                        queue.QueueItemAvailable -= QueueItemAvailable;
                        queuingService.DeleteWorkflowQueue(m_queueId);

                        // Finally inspect the results and do something with them.
                        Console.WriteLine(item.Message);

                        // Once we have finished our mission, we close this activity
                        context.CloseActivity();
                        return;
                    }
                }
            }

            // We should not get here if everything works as we expect.
            // So, throw an exception so the workflow will be terminated.
            throw new Exception("Something unexpected was placed in the Queue.");
        }
    }

Try creating a workflow that contains a Parallel activity and add this activity to both branches. If you really want to run activities in parallel, this is how it needs to be done.

Don't forget to add the service to the runtime!

Happy flowing!