Azure Event Hub / Service Bus Performance - Http Batch Send 2
After the previous post on Http Batch send for Service Bus I received some emails with questions. I looked at the msdn documentation for both Service Bus and Event Hub and noted they are missing some information. Event Hub documentation does not even mention Batch Send. So here is another post about Batch Send.
Does Event Hub support batch send?
YES!
Azure Event Hub was developed by Microsoft to support IoT scenarios. It was written by the same team that owns Azure Service Bus (that is why it’s under the Service Bus Namespace) so they share a big part of their codebase. As a matter of fact I worked on the Batch Send feature to improve the throughput of the Http code path for Eventhub. But as we were working on it we made sure it was working for Service Bus as well.
Since I’m serializing as JSON do I need to change something in the Receive?
Nothing changes in the receive path. Json is only used to create the batches of messages, but in the receive code path we will be receiving them separately. The batch is gone by then.
Can you show a few samples on how to send different payloads?
Like I said in my previous post this is the format for an Http Message:
public class HttpBrokeredMessage { public string Body { get; set; } public Dictionary<string, object> BrokerProperties { get; set; } public bool IsBodyBase64 { get; set; } public Dictionary<string, object> UserProperties { get; set; } } |
You don’t need to send the fields that you don’t use. So if you are only sending a text payload you can just send a message like this:
using (var requestStream = webClient.GetRequestStream()){ var bodyBytes = Encoding.Default.GetBytes( "[{\"Body\":\"Message1\"}]" ); requestStream.Write(bodyBytes, 0, bodyBytes.Length); } |
This is equivalent to the Non-Batch Http Send. The receive code path does not change. You would just receive it like you would receive a regular text message (no need to deserialize, we default to text serialization).
However in the past you have been able to set some Broker Properties in your Http message (such as CorrelationId, SessionId, PartitionKey, TimeToLiveTimeSpan, etc) and we still support that. You can just set those as part of the BrokerProperties Dictionary and we will set them in the BrokeredMessage.
using (var requestStream = request.GetRequestStream()){ var bodyBytes = Encoding.Default.GetBytes( @"[{""Body"":""SGVsbG8="",""BrokerProperties"":[{""Name"":""CorrelationId"",""Value"":""32119834-65f3-48c1-b366-619df2e4c400""}]}]" ); requestStream.Write(bodyBytes, 0, bodyBytes.Length); } |
And again if you want to set your own Properties you can add them to the UserProperties Dictionary and we will populate the BrokeredMessage.Properties with them
using ( var requestStream = request.GetRequestStream()){ var bodyBytes = Encoding.Default.GetBytes(@"[{""Body"":""SGVsbG8="",""UserProperties"":[{""Name"":""ColorValue"",""Value"":""Black""}]}]" ); requestStream.Write(bodyBytes, 0, bodyBytes.Length); } |
And finally if you want to send an arbitrary object instead of plain text you can do it by using the IsBodyBase64 setting.
Here is an example of sending an object of a custom class called MyTestObject.
1)
[Serializable] public class MyTestObject { public string Name { get ; set ; } public DateTime TimeStamp { get ; set ; } } |
2)
var arbitraryObject = new SerializationHelper.MyTestObject() { Name = msgBodies[0], TimeStamp = DateTime.Now }; var jsonBody = PayloadHelper .JsonifyObject(arbitraryObject); |
3)
// Generates the Json payload with Json.net Serialization public static string JsonifyObject(object serializableObject){ string objectEncodedBytes; using ( var ms = new MemoryStream()) { var bf = new BinaryFormatter(); bf.Serialize(ms, serializableObject); objectEncodedBytes = Convert.ToBase64String(ms.ToArray()); var jsonObject = new SerializationHelper.HttpBrokeredMessage() { Body = objectEncodedBytes, IsBodyBase64 = true }; var batch = new[] { jsonObject }; return JsonConvert.SerializeObject(batch); } |
As you can see we do the following:
- Define the class of our custom object
- We create a new object of this class
- We JsonifyTheObject (This creates the message that we will insert into a batch)
- Use BinaryFormatter (from System.Runtime.Serialization) to serialize our object.
- Convert that to a Base64 Encoded string (This is the body)
- Create an HttpBrokeredMessage that has this newly created body
- Serialize this to Json with NewtonSoft.JsonConvert
- Now we have a json string that contains one message. We can do the same with the rest of our messages and send them as a Json array “[message1, message2…]”
Now the interesting thing of sending binary messages is that when we are receiving them we can just do a Receive<MyTestObject> and the .Net client will do all the hard work of deserialization for you. So the code in the receive path is a lot simpler :)
Accepted types for UserProperties
We accept the same types used for regular Rest Send:
· Boolean values
· Null
· Integers
· Doubles
· Strings
Sample receive code
The receive code path has not changed at all. From the .Net client you can just do a Receive and get back your message. From Http you can use code similar to this:
For Text:
using ( var response = ( HttpWebResponse )readRequest.GetResponse()) { if (response.StatusCode == HttpStatusCode .OK) { using (var content = response.GetResponseStream()) { var reader = new StreamReader (content); string receivedMessage = reader.ReadToEnd(); } } } |
For Binary objects:
using ( var response = ( HttpWebResponse )readRequest.GetResponse()){ if (response.StatusCode == HttpStatusCode .OK) { using ( var content = response.GetResponseStream()) { byte[] receivedBytes = SerializationHelper.ReadBytesFromStream(content); object receivedObject = SerializationHelper.ByteArray2Object(receivedBytes); var received = (SerializationHelper.MyTestObject)receivedObject; } } } |
}
How to debug any problems with Batch Send?
1) If when you receive the message you receive one IEnumerable object with n elements in it that means you are not going through the Batch Send pipeline. Double check if the Request content type is correct. It should be "application/vnd.microsoft.servicebus.json"
2) If you get a 400 error when you send then that means something is wrong with your message. You can use this code to get more information from the web exception:
static string GetErrorFromException( WebException webExcp){ var httpResponse = ( HttpWebResponse )webExcp.Response; var stream = httpResponse.GetResponseStream(); byte[] receivedBytes = SerializationHelper .ReadBytesFromStream(stream); return Encoding.UTF8.GetString(receivedBytes); } |
Comments
Anonymous
October 02, 2015
I've copied your exact code for my event hub. Everything was working for non-batched events, but I haven't been able to get batched working. I wonder if you have any ideas. Here is the essence of what I've done. var token = _creds.SASToken; var url = string.Format("https://{0}/{1}/publishers/{2}/messages?timeout=60&api-version=2014-01", _creds.Address, _creds.Path, partitionedEvents.Key); var httpClient = new HttpClient(); httpClient.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", token); //Your string from above var str = new StringContent(@"[{""Body"":""SGVsbG8="",""UserProperties"":[{""Name"":""ColorValue"",""Value"":""Black""}]}]"); str.Headers.ContentType = new MediaTypeHeaderValue("application/json"); var resp = await httpClient.PostAsync(url, str);Anonymous
January 10, 2016
Please see the Debug section for an explanation on why this could be failing.