Asynchronous Parallel Blob Transfers with Progress Change Notification 2.0
This post is an update to the post at https://blogs.msdn.com/b/kwill/archive/2011/05/30/asynchronous-parallel-block-blob-transfers-with-progress-change-notification.aspx.
Improvements from previous version
- Upgraded to Azure Storage Client library 2.0 (Microsoft.WindowsAzure.Storage.dll).
- Switched from custom parallel transfer code to the built in BeginDownloadToStream and BeginUploadFromStream methods which provides better performance and more reliable functionality with the same async parallel operations.
- Helper functions to allow clients using Storage Client library 1.7 (Microsoft.WindowsAzure.StorageClient.dll) to utilize the functionality.
Upgrade instructions
The changes were designed to allow clients using the older version of the code a drop-in replacement with almost 0 code changes. If you are upgrading a client to use this new code there are a few small changes to make:
- Add a reference to Azure Storage Client Library 2.0. The Nuget package manager makes this a near 1-click operation.
- The TransferTypeEnum has been moved into the BlobTransfer class. If your client code utilizes TransferTypeEnum upgrade your code to use BlobTransfer.TransferTypeEnum
BlobTransfer.cs
using System;
using System.ComponentModel;
using System.Collections.Generic;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
using System.Linq;
namespace BlobTransferUI
{
// Class to allow for easy async upload and download functions with progress change notifications
// Requires references to Microsoft.WindowsAzure.Storage.dll (Storage client 2.0) and Microsoft.WindowsAzure.StorageClient.dll (Storage client 1.7).
// See comments on UploadBlobAsync and DownloadBlobAsync functions for information on removing the 1.7 client library dependency
class BlobTransfer
{
// Public async events
public event AsyncCompletedEventHandler TransferCompleted;
public event EventHandler<BlobTransferProgressChangedEventArgs> TransferProgressChanged;
// Public BlobTransfer properties
public TransferTypeEnum TransferType;
// Private variables
private ICancellableAsyncResult asyncresult;
private bool Working = false;
private object WorkingLock = new object();
private AsyncOperation asyncOp;
// Used to calculate download speeds
private Queue<long> timeQueue = new Queue<long>(200);
private Queue<long> bytesQueue = new Queue<long>(200);
private DateTime updateTime = System.DateTime.Now;
// Private BlobTransfer properties
private string m_FileName;
private ICloudBlob m_Blob;
// Helper function to allow Storage Client 1.7 (Microsoft.WindowsAzure.StorageClient) to utilize this class.
// Remove this function if only using Storage Client 2.0 (Microsoft.WindowsAzure.Storage).
public void UploadBlobAsync(Microsoft.WindowsAzure.StorageClient.CloudBlob blob, string LocalFile)
{
Microsoft.WindowsAzure.StorageCredentialsAccountAndKey account = blob.ServiceClient.Credentials as Microsoft.WindowsAzure.StorageCredentialsAccountAndKey;
ICloudBlob blob2 = new CloudBlockBlob(blob.Attributes.Uri, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(blob.ServiceClient.Credentials.AccountName, account.Credentials.ExportBase64EncodedKey()));
UploadBlobAsync(blob2, LocalFile);
}
// Helper function to allow Storage Client 1.7 (Microsoft.WindowsAzure.StorageClient) to utilize this class.
// Remove this function if only using Storage Client 2.0 (Microsoft.WindowsAzure.Storage).
public void DownloadBlobAsync(Microsoft.WindowsAzure.StorageClient.CloudBlob blob, string LocalFile)
{
Microsoft.WindowsAzure.StorageCredentialsAccountAndKey account = blob.ServiceClient.Credentials as Microsoft.WindowsAzure.StorageCredentialsAccountAndKey;
ICloudBlob blob2 = new CloudBlockBlob(blob.Attributes.Uri, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(blob.ServiceClient.Credentials.AccountName, account.Credentials.ExportBase64EncodedKey()));
DownloadBlobAsync(blob2, LocalFile);
}
public void UploadBlobAsync(ICloudBlob blob, string LocalFile)
{
// The class currently stores state in class level variables so calling UploadBlobAsync or DownloadBlobAsync a second time will cause problems.
// A better long term solution would be to better encapsulate the state, but the current solution works for the needs of my primary client.
// Throw an exception if UploadBlobAsync or DownloadBlobAsync has already been called.
lock (WorkingLock)
{
if (!Working)
Working = true;
else
throw new Exception("BlobTransfer already initiated. Create new BlobTransfer object to initiate a new file transfer.");
}
// Attempt to open the file first so that we throw an exception before getting into the async work
using (FileStream fstemp = new FileStream(LocalFile, FileMode.Open, FileAccess.Read)) { }
// Create an async op in order to raise the events back to the client on the correct thread.
asyncOp = AsyncOperationManager.CreateOperation(blob);
TransferType = TransferTypeEnum.Upload;
m_Blob = blob;
m_FileName = LocalFile;
var file = new FileInfo(m_FileName);
long fileSize = file.Length;
FileStream fs = new FileStream(m_FileName, FileMode.Open, FileAccess.Read, FileShare.Read);
ProgressStream pstream = new ProgressStream(fs);
pstream.ProgressChanged += pstream_ProgressChanged;
pstream.SetLength(fileSize);
m_Blob.ServiceClient.ParallelOperationThreadCount = 10;
asyncresult = m_Blob.BeginUploadFromStream(pstream, BlobTransferCompletedCallback, new BlobTransferAsyncState(m_Blob, pstream));
}
public void DownloadBlobAsync(ICloudBlob blob, string LocalFile)
{
// The class currently stores state in class level variables so calling UploadBlobAsync or DownloadBlobAsync a second time will cause problems.
// A better long term solution would be to better encapsulate the state, but the current solution works for the needs of my primary client.
// Throw an exception if UploadBlobAsync or DownloadBlobAsync has already been called.
lock (WorkingLock)
{
if (!Working)
Working = true;
else
throw new Exception("BlobTransfer already initiated. Create new BlobTransfer object to initiate a new file transfer.");
}
// Create an async op in order to raise the events back to the client on the correct thread.
asyncOp = AsyncOperationManager.CreateOperation(blob);
TransferType = TransferTypeEnum.Download;
m_Blob = blob;
m_FileName = LocalFile;
m_Blob.FetchAttributes();
FileStream fs = new FileStream(m_FileName, FileMode.OpenOrCreate, FileAccess.Write, FileShare.Read);
ProgressStream pstream = new ProgressStream(fs);
pstream.ProgressChanged += pstream_ProgressChanged;
pstream.SetLength(m_Blob.Properties.Length);
m_Blob.ServiceClient.ParallelOperationThreadCount = 10;
asyncresult = m_Blob.BeginDownloadToStream(pstream, BlobTransferCompletedCallback, new BlobTransferAsyncState(m_Blob, pstream));
}
private void pstream_ProgressChanged(object sender, ProgressChangedEventArgs e)
{
BlobTransferProgressChangedEventArgs eArgs = null;
int progress = (int)((double)e.BytesRead / e.TotalLength * 100);
// raise the progress changed event on the asyncop thread
eArgs = new BlobTransferProgressChangedEventArgs(e.BytesRead, e.TotalLength, progress, CalculateSpeed(e.BytesRead), null);
asyncOp.Post(delegate(object e2) { OnTaskProgressChanged((BlobTransferProgressChangedEventArgs)e2); }, eArgs);
}
private void BlobTransferCompletedCallback(IAsyncResult result)
{
BlobTransferAsyncState state = (BlobTransferAsyncState)result.AsyncState;
ICloudBlob blob = state.Blob;
ProgressStream stream = (ProgressStream)state.Stream;
try
{
stream.Close();
// End the operation.
if (TransferType == TransferTypeEnum.Download)
blob.EndDownloadToStream(result);
else if (TransferType == TransferTypeEnum.Upload)
blob.EndUploadFromStream(result);
// Operation completed normally, raise the completed event
AsyncCompletedEventArgs completedArgs = new AsyncCompletedEventArgs(null, false, null);
asyncOp.PostOperationCompleted(delegate(object e) { OnTaskCompleted((AsyncCompletedEventArgs)e); }, completedArgs);
}
catch (StorageException ex)
{
if (!state.Cancelled)
{
throw (ex);
}
// Operation was cancelled, raise the event with the cancelled flag = true
AsyncCompletedEventArgs completedArgs = new AsyncCompletedEventArgs(null, true, null);
asyncOp.PostOperationCompleted(delegate(object e) { OnTaskCompleted((AsyncCompletedEventArgs)e); }, completedArgs);
}
}
// Cancel the async download
public void CancelAsync()
{
((BlobTransferAsyncState)asyncresult.AsyncState).Cancelled = true;
asyncresult.Cancel();
}
// Helper function to only raise the event if the client has subscribed to it.
protected virtual void OnTaskCompleted(AsyncCompletedEventArgs e)
{
if (TransferCompleted != null)
TransferCompleted(this, e);
}
// Helper function to only raise the event if the client has subscribed to it.
protected virtual void OnTaskProgressChanged(BlobTransferProgressChangedEventArgs e)
{
if (TransferProgressChanged != null)
TransferProgressChanged(this, e);
}
// Keep the last 200 progress change notifications and use them to calculate the average speed over that duration.
private double CalculateSpeed(long BytesSent)
{
double speed = 0;
if (timeQueue.Count >= 200)
{
timeQueue.Dequeue();
bytesQueue.Dequeue();
}
timeQueue.Enqueue(System.DateTime.Now.Ticks);
bytesQueue.Enqueue(BytesSent);
if (timeQueue.Count > 2)
{
updateTime = System.DateTime.Now;
speed = (bytesQueue.Max() - bytesQueue.Min()) / TimeSpan.FromTicks(timeQueue.Max() - timeQueue.Min()).TotalSeconds;
}
return speed;
}
// A modified version of the ProgressStream from https://blogs.msdn.com/b/paolos/archive/2010/05/25/large-message-transfer-with-wcf-adapters-part-1.aspx
// This class allows progress changed events to be raised from the blob upload/download.
private class ProgressStream : Stream
{
#region Private Fields
private Stream stream;
private long bytesTransferred;
private long totalLength;
#endregion
#region Public Handler
public event EventHandler<ProgressChangedEventArgs> ProgressChanged;
#endregion
#region Public Constructor
public ProgressStream(Stream file)
{
this.stream = file;
this.totalLength = file.Length;
this.bytesTransferred = 0;
}
#endregion
#region Public Properties
public override bool CanRead
{
get
{
return this.stream.CanRead;
}
}
public override bool CanSeek
{
get
{
return this.stream.CanSeek;
}
}
public override bool CanWrite
{
get
{
return this.stream.CanWrite;
}
}
public override void Flush()
{
this.stream.Flush();
}
public override void Close()
{
this.stream.Close();
}
public override long Length
{
get
{
return this.stream.Length;
}
}
public override long Position
{
get
{
return this.stream.Position;
}
set
{
this.stream.Position = value;
}
}
#endregion
#region Public Methods
public override int Read(byte[] buffer, int offset, int count)
{
int result = stream.Read(buffer, offset, count);
bytesTransferred += result;
if (ProgressChanged != null)
{
try
{
OnProgressChanged(new ProgressChangedEventArgs(bytesTransferred, totalLength));
//ProgressChanged(this, new ProgressChangedEventArgs(bytesTransferred, totalLength));
}
catch (Exception)
{
ProgressChanged = null;
}
}
return result;
}
protected virtual void OnProgressChanged(ProgressChangedEventArgs e)
{
if (ProgressChanged != null)
ProgressChanged(this, e);
}
public override long Seek(long offset, SeekOrigin origin)
{
return this.stream.Seek(offset, origin);
}
public override void SetLength(long value)
{
totalLength = value;
//this.stream.SetLength(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
this.stream.Write(buffer, offset, count);
bytesTransferred += count;
{
try
{
OnProgressChanged(new ProgressChangedEventArgs(bytesTransferred, totalLength));
//ProgressChanged(this, new ProgressChangedEventArgs(bytesTransferred, totalLength));
}
catch (Exception)
{
ProgressChanged = null;
}
}
}
protected override void Dispose(bool disposing)
{
stream.Dispose();
base.Dispose(disposing);
}
#endregion
}
private class BlobTransferAsyncState
{
public ICloudBlob Blob;
public Stream Stream;
public DateTime Started;
public bool Cancelled;
public BlobTransferAsyncState(ICloudBlob blob, Stream stream)
: this(blob, stream, DateTime.Now)
{ }
public BlobTransferAsyncState(ICloudBlob blob, Stream stream, DateTime started)
{
Blob = blob;
Stream = stream;
Started = started;
Cancelled = false;
}
}
private class ProgressChangedEventArgs : EventArgs
{
#region Private Fields
private long bytesRead;
private long totalLength;
#endregion
#region Public Constructor
public ProgressChangedEventArgs(long bytesRead, long totalLength)
{
this.bytesRead = bytesRead;
this.totalLength = totalLength;
}
#endregion
#region Public properties
public long BytesRead
{
get
{
return this.bytesRead;
}
set
{
this.bytesRead = value;
}
}
public long TotalLength
{
get
{
return this.totalLength;
}
set
{
this.totalLength = value;
}
}
#endregion
}
public enum TransferTypeEnum
{
Download,
Upload
}
public class BlobTransferProgressChangedEventArgs : System.ComponentModel.ProgressChangedEventArgs
{
private long m_BytesSent = 0;
private long m_TotalBytesToSend = 0;
private double m_Speed = 0;
public long BytesSent
{
get { return m_BytesSent; }
}
public long TotalBytesToSend
{
get { return m_TotalBytesToSend; }
}
public double Speed
{
get { return m_Speed; }
}
public TimeSpan TimeRemaining
{
get
{
TimeSpan time = new TimeSpan(0, 0, (int)((TotalBytesToSend - m_BytesSent) / (m_Speed == 0 ? 1 : m_Speed)));
return time;
}
}
public BlobTransferProgressChangedEventArgs(long BytesSent, long TotalBytesToSend, int progressPercentage, double Speed, object userState)
: base(progressPercentage, userState)
{
m_BytesSent = BytesSent;
m_TotalBytesToSend = TotalBytesToSend;
m_Speed = Speed;
}
}
}
}
Sample usage
BlobTransfer transfer;
private void button1_Click(object sender, EventArgs e)
{
CloudStorageAccount account = new CloudStorageAccount(new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials("accountname", "accountkey"), false);
CloudBlobClient client = account.CreateCloudBlobClient();
CloudBlobContainer container = client.GetContainerReference("container");
CloudBlockBlob blob = container.GetBlockBlobReference("file");
transfer = new BlobTransfer();
transfer.TransferProgressChanged += transfer_TransferProgressChanged;
transfer.TransferCompleted += transfer_TransferCompleted;
transfer.DownloadBlobAsync(blob, @"C:\temp\file");
}
private void button2_Click(object sender, EventArgs e)
{
transfer.CancelAsync();
}
void transfer_TransferCompleted(object sender, AsyncCompletedEventArgs e)
{
System.Diagnostics.Debug.WriteLine("Completed. Cancelled = " + e.Cancelled);
}
void transfer_TransferProgressChanged(object sender, BlobTransfer.BlobTransferProgressChangedEventArgs e)
{
System.Diagnostics.Debug.WriteLine("Changed - " + e.BytesSent + " / " + e.TotalBytesToSend + " = " + e.ProgressPercentage + "% " + e.Speed);
}
Simple Console Client
Calling the upload or download method from BlobTransfer is a pretty simple matter of obtaining a CloudBlob reference to the blob of interest, subscribing to the TransferProgressChanged and TransferCompleted eventargs, and then calling UploadBlobAsync or DownloadBlobAsync. The following console app shows a simple example.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.StorageClient;
namespace ConsoleApplication1
{
class Program
{
const string ACCOUNTNAME = "ENTER ACCOUNT NAME";
const string ACCOUNTKEY = "ENTER ACCOUNT KEY";
const string LOCALFILE = @"ENTER LOCAL FILE";
const string CONTAINER = "temp";
private static CloudStorageAccount AccountFileTransfer;
private static CloudBlobClient BlobClientFileTransfer;
private static CloudBlobContainer ContainerFileTransfer;
private static bool Transferring;
static void Main(string[] args)
{
System.Net.ServicePointManager.DefaultConnectionLimit = 35;
AccountFileTransfer = CloudStorageAccount.Parse("DefaultEndpointsProtocol=http;AccountName=" + ACCOUNTNAME + ";AccountKey=" + ACCOUNTKEY);
if (AccountFileTransfer != null)
{
BlobClientFileTransfer = AccountFileTransfer.CreateCloudBlobClient();
ContainerFileTransfer = BlobClientFileTransfer.GetContainerReference(CONTAINER);
ContainerFileTransfer.CreateIfNotExist();
}
// Upload the file
CloudBlob blobUpload = ContainerFileTransfer.GetBlobReference(CONTAINER + "/" + System.IO.Path.GetFileName(LOCALFILE));
BlobTransfer transferUpload = new BlobTransfer();
transferUpload.TransferProgressChanged += new EventHandler<BlobTransfer.BlobTransferProgressChangedEventArgs>(transfer_TransferProgressChanged);
transferUpload.TransferCompleted += new System.ComponentModel.AsyncCompletedEventHandler(transfer_TransferCompleted);
transferUpload.UploadBlobAsync(blobUpload, LOCALFILE);
Transferring = true;
while (Transferring)
{
Console.ReadLine();
}
// Download the file
CloudBlob blobDownload = ContainerFileTransfer.GetBlobReference(CONTAINER + "/" + System.IO.Path.GetFileName(LOCALFILE));
BlobTransfer transferDownload = new BlobTransfer();
transferDownload.TransferProgressChanged += new EventHandler<BlobTransfer.BlobTransferProgressChangedEventArgs>(transfer_TransferProgressChanged);
transferDownload.TransferCompleted += new System.ComponentModel.AsyncCompletedEventHandler(transfer_TransferCompleted);
transferDownload.DownloadBlobAsync(blobDownload, LOCALFILE + ".copy");
Transferring = true;
while (Transferring)
{
Console.ReadLine();
}
}
static void transfer_TransferCompleted(object sender, System.ComponentModel.AsyncCompletedEventArgs e)
{
Transferring = false;
Console.WriteLine("Transfer completed. Press any key to continue.");
}
static void transfer_TransferProgressChanged(object sender, BlobTransfer.BlobTransferProgressChangedEventArgs e)
{
Console.WriteLine("Transfer progress percentage = " + e.ProgressPercentage + " - " + (e.Speed / 1024).ToString("N2") + "KB/s");
}
}
}
UI Client
For a more full featured UI client check out the full source code at 0601.BlobTransferUI.zip.
Comments
Anonymous
March 05, 2013
I think you need the below to prevent simple recursion by user actions private void button1_Click(object sender, EventArgs e) { button1.Enabled(false); CloudStorageAccount account = new CloudStorageAccount(new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials("accountname", "accountkey"), false); ... transfer.DownloadBlobAsync(blob, @"C:tempfile"); button1.Enabled(true); }Anonymous
March 05, 2013
RLH, thanks for the comment. A few points to address your concern:
- The code you propose won't block the button from getting clicked multiple times. The DownloadBlobAsync call will return immediately, which means that your button1.Enabled(true) will also get called immediately and re-enable the button while the download is still happening. If you really wanted to disable the button then you would want to re-enable it in the TransferCompleted event.
- I would expect that a real client application would want to allow multiple simultaneous uploads and downloads, similar to the UI sample included in this blog post. The mechanism to prevent uploads/downloads to the same file or blob would have to be more complex than just disabling a button.
- The goal with this blog post was to provide the BlobTransfer.cs code, not necessarily to provide a full featured client application :) Kevin
Anonymous
March 09, 2013
Hi Kevin, Thanks for this code, it has been very useful in many situations. I've just posted a tool that uses it to upload and download hundreds of files here: www.vic.ms/.../blob-transfer-utility Only then, when I was referencing you, that I saw you made updates and GUI as well (I'm already using the latest version). Thank you again, Vitor Ciaramella (from Microsoft Brazil)Anonymous
April 08, 2013
The comment has been removedAnonymous
April 09, 2013
Dennis, can you expand a bit more on the changes you made and how you are calling the BlobTransfer class? The code works as-is, so there has to be something in your implementation which is causing this issue.Anonymous
May 06, 2013
The comment has been removedAnonymous
June 03, 2013
The comment has been removedAnonymous
June 03, 2013
The comment has been removedAnonymous
July 23, 2013
Does parallelism actually work for file uploads? I see several signs that it does not but am not sure about it. I'm testing this by downloading a 100MB file from Azure Storage. Why I'm doubting that it works:
- The documentation for ParallelOperationThreadCount states "Gets or sets the number of blocks that may be simultaneously uploaded when uploading a blob". In other words, according to the MSDN-documentation it only works for upload.
- I've downloaded the Azure Storage source code from GitHub. Searching for usages of ParallelOperationThreadCount shows that it's only used in UploadFromStream and BeginUploadFromStream. No traces of it being used for downloads.
- If I use fiddler when running the above example, I only see a single request being made. I would expects to see several requests, chunked.
- I've downloaded a single 100MB file from Azure Storage several times, and I do not see any performance difference if I change ParallelOperationThreadCount from 10 to 1.
Anonymous
November 20, 2013
Hi Kevin, Thanks for this code.It is very useful. I am new to windows azure. In my application we have basic upload/download functionality implemented. Next step for us is to implement a download manager with all advance features like start and pause,resume upload/download in case of network failure. Is it feasible to have download manager with advanced features? If yes,can you please share sample code? Thank you. Regards --PragyaAnonymous
February 20, 2014
Any idea how to do this using MVC API Controllers? Ive seen async controller but have no clue as to wire it up with your implementationAnonymous
April 06, 2014
Not clear to me how to start a download from the UI. Is it implemented?Anonymous
April 06, 2014
Sorry, I've understood that UI loads blob from the container specified in the code. I've another big issue that I've also with the Vitor Ciaramella one, so I think it's an issue in the base class: when the size of the source is huge (>70GB) the download strats very fast but it stuck indefenitely after about 1GB download.Anonymous
April 06, 2014
Debugging the code, I get that BlobTransferCompletedCallback is called before the real completion of the transfer. Which could be the reason?Anonymous
April 07, 2014
I've added these few lines to the DownloadBlobAsync method: m_Blob.ServiceClient.ServerTimeout = new TimeSpan(0, 30, 0); m_Blob.ServiceClient.MaximumExecutionTime = new TimeSpan(12, 0, 0); m_Blob.ServiceClient.RetryPolicy = new LinearRetry(new TimeSpan(0, 1, 0), 5); In this way it seems the above issue is reduced (even not removed completely)Anonymous
November 27, 2015
Any chance of getting this updated any time soon? The sample code will no longer compile with current NuGet packages.Anonymous
February 29, 2016
Hi, Thanks for the code ! I implemented it in a projet, everything is working except the cancellation. I can see that asyncresult.Cancel(); is called but the transfer keeps going. Is there some additional code to add to get it working ? Thanks !Anonymous
November 21, 2016
Would this now be replaced by the following library that seems to be headed to becoming part of the sdk maybe??https://github.com/Azure/azure-storage-net-data-movement- Anonymous
December 21, 2016
The comment has been removed
- Anonymous