Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Apache Kafka — это распределенная платформа потоковой передачи с открытым кодом. Она часто используется как брокер сообщений, предоставляя такие же функциональные возможности, как и очередь сообщений типа "публикация-подписка".
В этом кратком руководстве вы узнаете, как создать кластер Apache Kafka с помощью Azure PowerShell. Вы также узнаете, как с помощью предоставленных служебных программ отправлять и получать сообщения, используя Kafka.
Предупреждение
Расходы за кластеры HDInsight рассчитываются пропорционально по минутам, независимо от их использования. Обязательно удалите кластер, когда завершите его использование. См. как удалить кластер HDInsight.
Доступ к API Kafka имеют только ресурсы в одной виртуальной сети. В этом кратком руководстве вы напрямую обращаетесь к кластеру по протоколу SSH. Чтобы подключить к Kafka другие службы, сети или виртуальные машины, необходимо сначала создать виртуальную сеть, а затем создать в ней эти ресурсы. Дополнительные сведения см. в документе Подключение к Apache Kafka с использованием виртуальной сети.
Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
Предварительные требования
Примечание.
Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Чтобы узнать, как перейти на модуль Az PowerShell, см. статью Перенос Azure PowerShell с AzureRM на Az.
Модуль Az для PowerShell установлен.
Клиент SSH. Дополнительные сведения см. в руководстве по подключению к HDInsight (Apache Hadoop) с помощью SSH.
Вход в Azure
Войдите в подписку Azure с помощью командлета Connect-AzAccount
и следуйте инструкциям на экране.
# Login to your Azure subscription
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
Создать группу ресурсов
Создайте группу ресурсов Azure с помощью командлета PowerShell New-AzResourceGroup. Группа ресурсов — это логический контейнер, в котором происходит развертывание ресурсов Azure и управление ими. В следующем примере запрашиваются у вас имя и расположение, а затем создается новая группа ресурсов.
$resourceGroup = Read-Host -Prompt "Enter the resource group name"
$location = Read-Host -Prompt "Enter the Azure region to use"
New-AzResourceGroup -Name $resourceGroup -Location $location
Создание учетной записи хранилища
Хотя кластер Kafka в HDInsight использует управляемые диски Azure для хранения данных Kafka, он также использует службу хранилища Azure для хранения различных сведений, например журналов. Используйте командлет New-AzStorageAccount, чтобы создать учетную запись хранения.
Внимание
Тип учетной записи хранения BlobStorage
можно использовать только как дополнительное хранилище кластеров HDInsight.
$storageName = Read-Host -Prompt "Enter the storage account name"
New-AzStorageAccount `
-ResourceGroupName $resourceGroup `
-Name $storageName `
-Location $location `
-SkuName Standard_LRS `
-Kind StorageV2 `
-EnableHttpsTrafficOnly 1
HDInsight сохраняет данные в учетной записи Azure в Blob-контейнере. Создайте контейнер с помощью командлета New-AzStorageContainer.
$containerName = Read-Host -Prompt "Enter the container name"
$storageKey = (Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageName)[0].Value
$storageContext = New-AzStorageContext `
-StorageAccountName $storageName `
-StorageAccountKey $storageKey
New-AzStorageContainer -Name $containerName -Context $storageContext
Создание кластера Apache Kafka
Создайте кластер Apache Kafka в HDInsight с помощью командлета New-AzHDInsightCluster.
# Create a Kafka 2.4.1 cluster
$clusterName = Read-Host -Prompt "Enter the name of the Kafka cluster"
$httpCredential = Get-Credential -Message "Enter the cluster login credentials" -UserName "admin"
$sshCredentials = Get-Credential -Message "Enter the SSH user credentials" -UserName "sshuser"
$numberOfWorkerNodes = "4"
$clusterVersion = "5.0"
$clusterType="Kafka"
$disksPerNode=2
$kafkaConfig = New-Object "System.Collections.Generic.Dictionary``2[System.String,System.String]"
$kafkaConfig.Add("kafka", "2.4.1")
New-AzHDInsightCluster `
-ResourceGroupName $resourceGroup `
-ClusterName $clusterName `
-Location $location `
-ClusterSizeInNodes $numberOfWorkerNodes `
-ClusterType $clusterType `
-OSType "Linux" `
-Version $clusterVersion `
-ComponentVersion $kafkaConfig `
-HttpCredential $httpCredential `
-DefaultStorageAccountName "$storageName.blob.core.windows.net" `
-DefaultStorageAccountKey $storageKey `
-DefaultStorageContainer $clusterName `
-SshCredential $sshCredentials `
-DisksPerWorkerNode $disksPerNode
Операция создания кластера HDInsight может занять до 20 минут.
Параметр -DisksPerWorkerNode
позволяет настроить уровень масштабируемости Kafka в HDInsight. Кластер Kafka в HDInsight использует локальный диск виртуальных машин в кластере для хранения данных. Kafka обрабатывает большое количество операций ввода-вывода, поэтому используются управляемые диски Azure, чтобы обеспечить высокую пропускную способность и предоставить дополнительное хранилище для каждого узла.
Управляемый диск может быть двух типов: Стандартный (HDD) или Премиум (SSD). Тип диска зависит от размера виртуальной машины, используемой рабочими узлами (брокерами Kafka). Диски категории "Премиум" автоматически используются для виртуальных машин серий DS и GS. Для всех остальных типов виртуальных машин используется стандарт. Настроить тип виртуальной машины можно с помощью параметра -WorkerNodeSize
. Дополнительные сведения о параметрах см. в документации по New-AzHDInsightCluster.
Если вы планируете использовать более 32 рабочих узлов (при создании кластера или в ходе масштабирования после создания кластера), необходимо использовать параметр -HeadNodeSize
, чтобы указать размер виртуальной машины, обеспечивающий как минимум 8 процессорных ядер и 14 ГБ ОЗУ. Дополнительные сведения о размерах узлов и их стоимости см. на странице с ценами на HDInsight.
Подключение к кластеру
Чтобы подключиться к первичному головному узлу кластера Kafka, выполните приведенную ниже команду. Замените
sshuser
именем пользователя SSH. Заменитеmykafka
именем кластера Kafka.ssh sshuser@mykafka-ssh.azurehdinsight.net
При первом подключении к кластеру клиент SSH может отобразить предупреждение о том, что не удается установить подлинность узла. При появлении запроса введите yes (Да) и нажмите клавишу ВВОД, чтобы добавить узел в список доверенных серверов клиента SSH.
При появлении запроса введите пароль пользователя SSH.
После подключения отобразятся сведения, аналогичные приведенному ниже тексту.
Authorized uses only. All activity may be monitored and reported.
Welcome to Ubuntu 16.04.4 LTS (GNU/Linux 4.13.0-1011-azure x86_64)
* Documentation: https://help.ubuntu.com
* Management: https://landscape.canonical.com
* Support: https://ubuntu.com/advantage
Get cloud support with Ubuntu Advantage Cloud Guest:
https://www.ubuntu.com/business/services/cloud
83 packages can be updated.
37 updates are security updates.
Welcome to Kafka on HDInsight.
Last login: Thu Mar 29 13:25:27 2018 from 108.252.109.241
Получите информацию о хостах Apache Zookeeper и брокера
Для работы с Kafka необходимы сведения об узле Apache Zookeeper и узле брокера. Эти узлы используются Kafka API и многими другими служебными программами, поставляемыми с платформой Kafka.
В этом разделе вы получаете информацию об узле из REST API Apache Ambari в кластере.
Используя SSH-подключение к кластеру, выполните следующую команду, чтобы установить служебную программу
jq
. Эта служебная программа используется для анализа документов JSON. С ее помощью также удобно получать сведения об узлах.sudo apt -y install jq
Чтобы задать переменную среды с именем кластера, выполните следующую команду.
read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
При появлении запроса введите имя кластера Kafka.
Чтобы задать переменную среды с информацией о узле Zookeeper, используйте следующую команду. Команда извлекает все хосты Zookeeper, затем возвращает только первые две записи. Причина этого в том, что вы хотите обеспечить избыточность на случай, если один из узлов станет недоступным.
export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
При появлении запроса введите пароль учетной записи для входа в кластер (но не учетной записи SSH).
Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.
echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
Эта команда возвращает сведения аналогичные следующим:
<zookeepername1>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,<zookeepername2>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Чтобы задать переменную среды со сведениями об узле брокера Kafka, выполните следующую команду.
export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
При появлении запроса введите пароль учетной записи для входа в кластер (но не учетной записи SSH).
Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.
echo '$KAFKABROKERS='$KAFKABROKERS
Эта команда возвращает сведения аналогичные следующим:
<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Управление разделами Apache Kafka
Kafka хранит потоки данных в разделах. Для управления разделами можно использовать служебную программу kafka-topics.sh
.
Чтобы создать раздел, в сеансе SSH-подключения выполните следующую команду.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
Эта команда создает подключение к Zookeeper, используя хранящиеся в
$KAFKAZKHOSTS
сведения об узле. Затем она создает раздел Kafka с именем test.Данные, хранящиеся в этом разделе, разделены на восемь секций.
Каждый раздел реплицируется на три узла в кластере.
Если кластер был создан в регионе Azure, который предоставляет три домена сбоя, то следует использовать коэффициент репликации, равный 3. В противном случае следует использовать коэффициент репликации, равный 4.
В регионах с тремя доменами сбоя коэффициент репликации, равный 3, обеспечивает распределение реплик по доменам сбоя. В регионах с двумя доменами сбоя коэффициент репликации, равный 4, обеспечивает равномерное распределение реплик по доменам сбоя.
Для получения информации о количестве доменов сбоя в регионе см. документ Доступность виртуальных машин Linux.
В Kafka нет сведений о доменах сбоя Azure. При создании реплик разделов для тем они могут быть неправильно распределены, что снижает их высокую доступность.
Для обеспечения высокого уровня доступности используйте средство перераспределения секций Apache Kafka. Этот инструмент следует запускать из сеанса SSH-подключения на головном узле кластера Kafka.
Чтобы обеспечить максимально высокий уровень доступности данных Kafka, следует выполнить перераспределение реплик разделов для вашей темы в следующих случаях:
Вы создаете новую тему или раздел
Вы увеличиваете масштаб кластера.
Чтобы вывести список разделов, введите следующую команду.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Она выводит список разделов на кластере Kafka.
Чтобы удалить раздел, используйте следующую команду.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
Эта команда удаляет тему
topicname
.Предупреждение
Если вы уже удалили раздел
test
, то необходимо создать его заново. Он используется в последующих действиях в этом документе.
Чтобы получить дополнительные сведения о командах, доступных в служебной программе kafka-topics.sh
, используйте следующую команду.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh
Создание и использование записей
В Kafka записи хранятся в разделах. Записи создаются производителями, а используются потребителями. Производители и потребители взаимодействуют со службой брокера Kafka. Каждый рабочий узел в кластере HDInsight — это узел брокера Kafka.
Чтобы сохранить записи в созданный ранее раздел test, а затем считать их с помощью потребителя, сделайте следующее:
Чтобы записать записи в раздел, используйте служебную программу
kafka-console-producer.sh
из сеанса SSH-подключения./usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
После выполнения этой команды вы перейдете к пустой строке.
Введите текстовое сообщение в пустую строку и нажмите клавишу ВВОД. Введите несколько таких сообщений, а затем нажмите клавиши CTRL+C, чтобы вернуться к обычной командной строке. Каждая строка отправляется в раздел Kafka как отдельная запись.
Чтобы считать записи из раздела, используйте служебную программу
kafka-console-consumer.sh
из сеанса SSH-подключения./usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
Эта команда извлекает записи из раздела, а затем отображает их. Параметр
--from-beginning
указывает потребителю считывать данные с самого начала потока, поэтому будут извлечены все записи.Если вы используете более раннюю версию Kafka, замените
--bootstrap-server $KAFKABROKERS
на--zookeeper $KAFKAZKHOSTS
.Нажмите клавиши Ctrl+C, чтобы остановить процесс.
Вы также можете программно создавать производителей и потребителей. Пример использования этого API см. в руководстве Использование API производителя и потребителя Apache Kafka.
Очистка ресурсов
Вы можете удалить ненужную группу ресурсов, кластер HDInsight и все связанные с ним ресурсы, выполнив команду Remove-AzResourceGroup.
Remove-AzResourceGroup -Name $resourceGroup
Предупреждение
Выставление счетов для кластера HDInsight начинается сразу после его создания и прекращается при удалении кластера. Платежи рассчитываются поминутно, поэтому всегда следует удалять кластер, когда он больше не нужен.
При удалении кластера Kafka в HDInsight удаляются все данные, хранящиеся в Kafka.