Tutorial: Process Apache Kafka for Event Hubs events using Stream analytics
This article shows how to stream data into Event Hubs and process it with Azure Stream Analytics. It walks you through the following steps:
- Create an Event Hubs namespace.
- Create a Kafka client that sends messages to the event hub.
- Create a Stream Analytics job that copies data from the event hub into an Azure blob storage.
You do not need to change your protocol clients or run your own clusters when you use the Kafka endpoint exposed by an event hub. Azure Event Hubs supports Apache Kafka version 1.0. and above.
Prerequisites
To complete this quickstart, make sure you have the following prerequisites:
- An Azure subscription. If you do not have one, create a free account before you begin.
- Java Development Kit (JDK) 1.7+.
- Download and install a Maven binary archive.
- Git
- An Azure Storage account. If you don't have one, create one before proceeding further. The Stream Analytics job in this walkthrough stores the output data in an Azure blob storage.
Create an Event Hubs namespace
When you create an Event Hubs namespace, the Kafka endpoint for the namespace is automatically enabled. You can stream events from your applications that use the Kafka protocol into event hubs. Follow step-by-step instructions in the Create an event hub using Azure portal to create an Event Hubs namespace. If you are using a dedicated cluster, see Create a namespace and event hub in a dedicated cluster.
Note
Event Hubs for Kafka isn't supported in the basic tier.
Send messages with Kafka in Event Hubs
Clone the Azure Event Hubs for Kafka repository to your machine.
Navigate to the folder:
azure-event-hubs-for-kafka/quickstart/java/producer
.Update the configuration details for the producer in
src/main/resources/producer.config
. Specify the name and connection string for the event hub namespace.bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
Navigate to
azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/
, and open TestDataReporter.java file in an editor of your choice.Comment out the following line of code:
//final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
Add the following line of code in place of the commented code:
final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");
This code sends the event data in JSON format. When you configure input for a Stream Analytics job, you specify JSON as the format for the input data.
Run the producer and stream into Event Hubs. On a Windows machine, when using a Node.js command prompt, switch to the
azure-event-hubs-for-kafka/quickstart/java/producer
folder before running these commands.mvn clean package mvn exec:java -Dexec.mainClass="TestProducer"
Verify that event hub receives the data
Select Event Hubs under ENTITIES. Confirm that you see an event hub named test.
Confirm that you see messages coming in to the event hub.
Process event data using a Stream Analytics job
In this section, you create an Azure Stream Analytics job. The Kafka client sends events to the event hub. You create a Stream Analytics job that takes event data as input and outputs it to an Azure blob storage. If you don't have an Azure Storage account, create one.
The query in the Stream Analytics job passes through the data without performing any analytics. You can create a query that transforms the input data to produce output data in a different format or with gained insights.
Create a Stream Analytics job
- Select + Create a resource in the Azure portal.
- Select Analytics in the Azure Marketplace menu, and select Stream Analytics job.
- On the New Stream Analytics page, do the following actions:
Enter a name for the job.
Select your subscription.
Select Create new for the resource group and enter the name. You can also use an existing resource group.
Select a location for the job.
Select Create to create the job.
Configure job input
In the notification message, select Go to resource to see the Stream Analytics job page.
Select Inputs in the JOB TOPOLOGY section on the left menu.
Select Add stream input, and then select Event Hub.
On the Event Hub input configuration page, do the following actions:
Specify an alias for the input.
Select your Azure subscription.
Select the event hub namespace your created earlier.
Select test for the event hub.
Select Save.
Configure job output
- Select Outputs in the JOB TOPOLOGY section on the menu.
- Select + Add on the toolbar, and select Blob storage
- On the Blob storage output settings page, do the following actions:
Specify an alias for the output.
Select your Azure subscription.
Select your Azure Storage account.
Enter a name for the container that stores the output data from the Stream Analytics query.
Select Save.
Define a query
After you have a Stream Analytics job setup to read an incoming data stream, the next step is to create a transformation that analyzes data in real time. You define the transformation query by using Stream Analytics Query Language. In this walkthrough, you define a query that passes through the data without performing any transformation.
Select Query.
In the query window, replace
[YourOutputAlias]
with the output alias you created earlier.Replace
[YourInputAlias]
with the input alias you created earlier.Select Save on the toolbar.
Run the Stream Analytics job
Select Overview on the left menu.
Select Start.
On the Start job page, select Start.
Wait until the status of the job changes from Starting to running.
Test the scenario
Run the Kafka producer again to send events to the event hub.
mvn exec:java -Dexec.mainClass="TestProducer"
Confirm that you see output data is generated in the Azure blob storage. You see a JSON file in the container with 100 rows that look like the following sample rows:
{"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"} {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
The Azure Stream Analytics job received input data from the event hub and stored it in the Azure blob storage in this scenario.
Next steps
In this article, you learned how to stream into Event Hubs without changing your protocol clients or running your own clusters. To learn more about Event Hubs for Apache Kafka, see Apache Kafka developer guide for Azure Event Hubs.