Quickstart: Create Apache Kafka cluster in Azure HDInsight using PowerShell
Apache Kafka is an open-source, distributed streaming platform. It's often used as a message broker, as it provides functionality similar to a publish-subscribe message queue.
In this quickstart, you learn how to create an Apache Kafka cluster using Azure PowerShell. You also learn how to use included utilities to send and receive messages using Kafka.
Warning
Billing for HDInsight clusters is prorated per minute, whether you use them or not. Be sure to delete your cluster after you finish using it. See how to delete an HDInsight cluster.
Only resources within the same virtual network have access to the Kafka API. In this quickstart, you access the cluster directly using SSH. To connect other services, networks, or virtual machines to Kafka, you must first create a virtual network and then create the resources within the network. For more information, see the Connect to Apache Kafka using a virtual network document.
If you don't have an Azure subscription, create a free account before you begin.
Prerequisites
Note
We recommend that you use the Azure Az PowerShell module to interact with Azure. To get started, see Install Azure PowerShell. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.
The PowerShell Az Module installed.
An SSH client. For more information, see Connect to HDInsight (Apache Hadoop) using SSH.
Sign in to Azure
Sign in to your Azure subscription with the Connect-AzAccount
cmdlet and follow the on-screen directions.
# Login to your Azure subscription
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
Create resource group
Create an Azure resource group with New-AzResourceGroup. A resource group is a logical container into which Azure resources are deployed and managed. The following example prompts you for the name and location, and then creates a new resource group:
$resourceGroup = Read-Host -Prompt "Enter the resource group name"
$location = Read-Host -Prompt "Enter the Azure region to use"
New-AzResourceGroup -Name $resourceGroup -Location $location
Create a storage account
While Kafka on HDInsight uses Azure Managed disks to store Kafka data, the cluster also uses Azure Storage to store information such as logs. Use New-AzStorageAccount to create a new storage account.
Important
Storage account kind BlobStorage
can only be used as secondary storage for HDInsight clusters.
$storageName = Read-Host -Prompt "Enter the storage account name"
New-AzStorageAccount `
-ResourceGroupName $resourceGroup `
-Name $storageName `
-Location $location `
-SkuName Standard_LRS `
-Kind StorageV2 `
-EnableHttpsTrafficOnly 1
HDInsight stores data in the storage account in a blob container. Use New-AzStorageContainer to create a new container.
$containerName = Read-Host -Prompt "Enter the container name"
$storageKey = (Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageName)[0].Value
$storageContext = New-AzStorageContext `
-StorageAccountName $storageName `
-StorageAccountKey $storageKey
New-AzStorageContainer -Name $containerName -Context $storageContext
Create an Apache Kafka cluster
Create an Apache Kafka on HDInsight cluster with New-AzHDInsightCluster.
# Create a Kafka 2.4.1 cluster
$clusterName = Read-Host -Prompt "Enter the name of the Kafka cluster"
$httpCredential = Get-Credential -Message "Enter the cluster login credentials" -UserName "admin"
$sshCredentials = Get-Credential -Message "Enter the SSH user credentials" -UserName "sshuser"
$numberOfWorkerNodes = "4"
$clusterVersion = "5.0"
$clusterType="Kafka"
$disksPerNode=2
$kafkaConfig = New-Object "System.Collections.Generic.Dictionary``2[System.String,System.String]"
$kafkaConfig.Add("kafka", "2.4.1")
New-AzHDInsightCluster `
-ResourceGroupName $resourceGroup `
-ClusterName $clusterName `
-Location $location `
-ClusterSizeInNodes $numberOfWorkerNodes `
-ClusterType $clusterType `
-OSType "Linux" `
-Version $clusterVersion `
-ComponentVersion $kafkaConfig `
-HttpCredential $httpCredential `
-DefaultStorageAccountName "$storageName.blob.core.windows.net" `
-DefaultStorageAccountKey $storageKey `
-DefaultStorageContainer $clusterName `
-SshCredential $sshCredentials `
-DisksPerWorkerNode $disksPerNode
It can take up to 20 minutes to create the HDInsight cluster.
The -DisksPerWorkerNode
parameter configures the scalability of Kafka on HDInsight. Kafka on HDInsight uses the local disk of the virtual machines in the cluster to store data. Kafka is I/O heavy, so Azure Managed Disks are used to provide high throughput and more storage per node.
The type of managed disk can be either Standard (HDD) or Premium (SSD). The type of disk depends on the VM size used by the worker nodes (Kafka brokers). Premium disks are used automatically with DS and GS series VMs. All other VM types use standard. You can set the VM type by using the -WorkerNodeSize
parameter. For more information on parameters, see the New-AzHDInsightCluster documentation.
If you plan to use more than 32 worker nodes (either at cluster creation or by scaling the cluster after creation), you must use the -HeadNodeSize
parameter to specify a VM size with at least 8 cores and 14 GB of RAM. For more information on node sizes and associated costs, see HDInsight pricing.
Connect to the cluster
To connect to the primary head node of the Kafka cluster, use the following command. Replace
sshuser
with the SSH user name. Replacemykafka
with the name of your Kafka clusterssh sshuser@mykafka-ssh.azurehdinsight.net
When you first connect to the cluster, your SSH client may display a warning that the authenticity of the host can't be established. When prompted type yes, and then press Enter to add the host to your SSH client's trusted server list.
When prompted, enter the password for the SSH user.
Once connected, you see information similar to the following text:
Authorized uses only. All activity may be monitored and reported.
Welcome to Ubuntu 16.04.4 LTS (GNU/Linux 4.13.0-1011-azure x86_64)
* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage
Get cloud support with Ubuntu Advantage Cloud Guest:
https://www.ubuntu.com/business/services/cloud
83 packages can be updated.
37 updates are security updates.
Welcome to Kafka on HDInsight.
Last login: Thu Mar 29 13:25:27 2018 from 108.252.109.241
Get the Apache Zookeeper and Broker host information
When working with Kafka, you must know the Apache Zookeeper and Broker hosts. These hosts are used with the Kafka API and many of the utilities that ship with Kafka.
In this section, you get the host information from the Apache Ambari REST API on the cluster.
From the SSH connection to the cluster, use the following command to install the
jq
utility. This utility is used to parse JSON documents, and is useful in retrieving the host information:sudo apt -y install jq
To set an environment variable to the cluster name, use the following command:
read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
When prompted, enter the name of the Kafka cluster.
To set an environment variable with Zookeeper host information, use the following command. The command retrieves all Zookeeper hosts, then returns only the first two entries. This is because you want some redundancy in case one host is unreachable.
export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
When prompted, enter the password for the cluster login account (not the SSH account).
To verify that the environment variable is set correctly, use the following command:
echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
This command returns information similar to the following text:
<zookeepername1>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,<zookeepername2>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
To set an environment variable with Kafka broker host information, use the following command:
export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
When prompted, enter the password for the cluster login account (not the SSH account).
To verify that the environment variable is set correctly, use the following command:
echo '$KAFKABROKERS='$KAFKABROKERS
This command returns information similar to the following text:
<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Manage Apache Kafka topics
Kafka stores streams of data in topics. You can use the kafka-topics.sh
utility to manage topics.
To create a topic, use the following command in the SSH connection:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
This command connects to Zookeeper using the host information stored in
$KAFKAZKHOSTS
. It then creates a Kafka topic named test.Data stored in this topic is partitioned across eight partitions.
Each partition is replicated across three worker nodes in the cluster.
If you created the cluster in an Azure region that provides three fault domains, use a replication factor of 3. Otherwise, use a replication factor of 4.
In regions with three fault domains, a replication factor of 3 allows replicas to be spread across the fault domains. In regions with two fault domains, a replication factor of four spreads the replicas evenly across the domains.
For information on the number of fault domains in a region, see the Availability of Linux virtual machines document.
Kafka is not aware of Azure fault domains. When creating partition replicas for topics, it may not distribute replicas properly for high availability.
To ensure high availability, use the Apache Kafka partition rebalance tool. This tool must be ran from an SSH connection to the head node of your Kafka cluster.
For the highest availability of your Kafka data, you should rebalance the partition replicas for your topic when:
You create a new topic or partition
You scale up a cluster
To list topics, use the following command:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
This command lists the topics available on the Kafka cluster.
To delete a topic, use the following command:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
This command deletes the topic named
topicname
.Warning
If you delete the
test
topic created earlier, then you must recreate it. It is used by steps later in this document.
For more information on the commands available with the kafka-topics.sh
utility, use the following command:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh
Produce and consume records
Kafka stores records in topics. Records are produced by producers, and consumed by consumers. Producers and consumers communicate with the Kafka broker service. Each worker node in your HDInsight cluster is a Kafka broker host.
To store records into the test topic you created earlier, and then read them using a consumer, use the following steps:
To write records to the topic, use the
kafka-console-producer.sh
utility from the SSH connection:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
After this command, you arrive at an empty line.
Type a text message on the empty line and hit enter. Enter a few messages this way, and then use Ctrl + C to return to the normal prompt. Each line is sent as a separate record to the Kafka topic.
To read records from the topic, use the
kafka-console-consumer.sh
utility from the SSH connection:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
This command retrieves the records from the topic and displays them. Using
--from-beginning
tells the consumer to start from the beginning of the stream, so all records are retrieved.If you are using an older version of Kafka, replace
--bootstrap-server $KAFKABROKERS
with--zookeeper $KAFKAZKHOSTS
.Use Ctrl + C to stop the consumer.
You can also programmatically create producers and consumers. For an example of using this API, see the Apache Kafka Producer and Consumer API with HDInsight document.
Clean up resources
When no longer needed, you can use the Remove-AzResourceGroup command to remove the resource group, HDInsight, and all related resources.
Remove-AzResourceGroup -Name $resourceGroup
Warning
HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.
Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.