Поделиться через


Краткое руководство: Создание кластера Apache Kafka в Azure HDInsight с помощью PowerShell

Apache Kafka — это распределенная платформа потоковой передачи с открытым кодом. Она часто используется как брокер сообщений, предоставляя такие же функциональные возможности, как и очередь сообщений типа "публикация-подписка".

В этом кратком руководстве вы узнаете, как создать кластер Apache Kafka с помощью Azure PowerShell. Вы также узнаете, как с помощью предоставленных служебных программ отправлять и получать сообщения, используя Kafka.

Предупреждение

Расходы за кластеры HDInsight рассчитываются пропорционально по минутам, независимо от их использования. Обязательно удалите кластер, когда завершите его использование. См. как удалить кластер HDInsight.

Доступ к API Kafka имеют только ресурсы в одной виртуальной сети. В этом кратком руководстве вы напрямую обращаетесь к кластеру по протоколу SSH. Чтобы подключить к Kafka другие службы, сети или виртуальные машины, необходимо сначала создать виртуальную сеть, а затем создать в ней эти ресурсы. Дополнительные сведения см. в документе Подключение к Apache Kafka с использованием виртуальной сети.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Предварительные требования

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Чтобы узнать, как перейти на модуль Az PowerShell, см. статью Перенос Azure PowerShell с AzureRM на Az.

Вход в Azure

Войдите в подписку Azure с помощью командлета Connect-AzAccount и следуйте инструкциям на экране.

# Login to your Azure subscription
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

Создать группу ресурсов

Создайте группу ресурсов Azure с помощью командлета PowerShell New-AzResourceGroup. Группа ресурсов — это логический контейнер, в котором происходит развертывание ресурсов Azure и управление ими. В следующем примере запрашиваются у вас имя и расположение, а затем создается новая группа ресурсов.

$resourceGroup = Read-Host -Prompt "Enter the resource group name"
$location = Read-Host -Prompt "Enter the Azure region to use"

New-AzResourceGroup -Name $resourceGroup -Location $location

Создание учетной записи хранилища

Хотя кластер Kafka в HDInsight использует управляемые диски Azure для хранения данных Kafka, он также использует службу хранилища Azure для хранения различных сведений, например журналов. Используйте командлет New-AzStorageAccount, чтобы создать учетную запись хранения.

Внимание

Тип учетной записи хранения BlobStorage можно использовать только как дополнительное хранилище кластеров HDInsight.

$storageName = Read-Host -Prompt "Enter the storage account name"

New-AzStorageAccount `
    -ResourceGroupName $resourceGroup `
    -Name $storageName `
    -Location $location `
    -SkuName Standard_LRS `
    -Kind StorageV2 `
    -EnableHttpsTrafficOnly 1

HDInsight сохраняет данные в учетной записи Azure в Blob-контейнере. Создайте контейнер с помощью командлета New-AzStorageContainer.

$containerName = Read-Host -Prompt "Enter the container name"

$storageKey = (Get-AzStorageAccountKey `
                -ResourceGroupName $resourceGroup `
                -Name $storageName)[0].Value
$storageContext = New-AzStorageContext `
                    -StorageAccountName $storageName `
                    -StorageAccountKey $storageKey

New-AzStorageContainer -Name $containerName -Context $storageContext

Создание кластера Apache Kafka

Создайте кластер Apache Kafka в HDInsight с помощью командлета New-AzHDInsightCluster.

# Create a Kafka 2.4.1 cluster
$clusterName = Read-Host -Prompt "Enter the name of the Kafka cluster"
$httpCredential = Get-Credential -Message "Enter the cluster login credentials" -UserName "admin"
$sshCredentials = Get-Credential -Message "Enter the SSH user credentials" -UserName "sshuser"

$numberOfWorkerNodes = "4"
$clusterVersion = "5.0"
$clusterType="Kafka"
$disksPerNode=2

$kafkaConfig = New-Object "System.Collections.Generic.Dictionary``2[System.String,System.String]"
$kafkaConfig.Add("kafka", "2.4.1")

New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroup `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $numberOfWorkerNodes `
        -ClusterType $clusterType `
        -OSType "Linux" `
        -Version $clusterVersion `
        -ComponentVersion $kafkaConfig `
        -HttpCredential $httpCredential `
        -DefaultStorageAccountName "$storageName.blob.core.windows.net" `
        -DefaultStorageAccountKey $storageKey `
        -DefaultStorageContainer $clusterName `
        -SshCredential $sshCredentials `
        -DisksPerWorkerNode $disksPerNode

Операция создания кластера HDInsight может занять до 20 минут.

Параметр -DisksPerWorkerNode позволяет настроить уровень масштабируемости Kafka в HDInsight. Кластер Kafka в HDInsight использует локальный диск виртуальных машин в кластере для хранения данных. Kafka обрабатывает большое количество операций ввода-вывода, поэтому используются управляемые диски Azure, чтобы обеспечить высокую пропускную способность и предоставить дополнительное хранилище для каждого узла.

Управляемый диск может быть двух типов: Стандартный (HDD) или Премиум (SSD). Тип диска зависит от размера виртуальной машины, используемой рабочими узлами (брокерами Kafka). Диски категории "Премиум" автоматически используются для виртуальных машин серий DS и GS. Для всех остальных типов виртуальных машин используется стандарт. Настроить тип виртуальной машины можно с помощью параметра -WorkerNodeSize. Дополнительные сведения о параметрах см. в документации по New-AzHDInsightCluster.

Если вы планируете использовать более 32 рабочих узлов (при создании кластера или в ходе масштабирования после создания кластера), необходимо использовать параметр -HeadNodeSize, чтобы указать размер виртуальной машины, обеспечивающий как минимум 8 процессорных ядер и 14 ГБ ОЗУ. Дополнительные сведения о размерах узлов и их стоимости см. на странице с ценами на HDInsight.

Подключение к кластеру

  1. Чтобы подключиться к первичному головному узлу кластера Kafka, выполните приведенную ниже команду. Замените sshuser именем пользователя SSH. Замените mykafka именем кластера Kafka.

    ssh sshuser@mykafka-ssh.azurehdinsight.net
    
  2. При первом подключении к кластеру клиент SSH может отобразить предупреждение о том, что не удается установить подлинность узла. При появлении запроса введите yes (Да) и нажмите клавишу ВВОД, чтобы добавить узел в список доверенных серверов клиента SSH.

  3. При появлении запроса введите пароль пользователя SSH.

После подключения отобразятся сведения, аналогичные приведенному ниже тексту.

Authorized uses only. All activity may be monitored and reported.
Welcome to Ubuntu 16.04.4 LTS (GNU/Linux 4.13.0-1011-azure x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

  Get cloud support with Ubuntu Advantage Cloud Guest:
    https://www.ubuntu.com/business/services/cloud

83 packages can be updated.
37 updates are security updates.



Welcome to Kafka on HDInsight.

Last login: Thu Mar 29 13:25:27 2018 from 108.252.109.241

Получите информацию о хостах Apache Zookeeper и брокера

Для работы с Kafka необходимы сведения об узле Apache Zookeeper и узле брокера. Эти узлы используются Kafka API и многими другими служебными программами, поставляемыми с платформой Kafka.

В этом разделе вы получаете информацию об узле из REST API Apache Ambari в кластере.

  1. Используя SSH-подключение к кластеру, выполните следующую команду, чтобы установить служебную программу jq. Эта служебная программа используется для анализа документов JSON. С ее помощью также удобно получать сведения об узлах.

    sudo apt -y install jq
    
  2. Чтобы задать переменную среды с именем кластера, выполните следующую команду.

    read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
    

    При появлении запроса введите имя кластера Kafka.

  3. Чтобы задать переменную среды с информацией о узле Zookeeper, используйте следующую команду. Команда извлекает все хосты Zookeeper, затем возвращает только первые две записи. Причина этого в том, что вы хотите обеспечить избыточность на случай, если один из узлов станет недоступным.

    export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    

    При появлении запроса введите пароль учетной записи для входа в кластер (но не учетной записи SSH).

  4. Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.

     echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
    

    Эта команда возвращает сведения аналогичные следующим:

    <zookeepername1>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,<zookeepername2>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  5. Чтобы задать переменную среды со сведениями об узле брокера Kafka, выполните следующую команду.

    export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    

    При появлении запроса введите пароль учетной записи для входа в кластер (но не учетной записи SSH).

  6. Чтобы убедиться, что переменную среды задано верно, выполните следующую команду.

    echo '$KAFKABROKERS='$KAFKABROKERS
    

    Эта команда возвращает сведения аналогичные следующим:

    <brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

Управление разделами Apache Kafka

Kafka хранит потоки данных в разделах. Для управления разделами можно использовать служебную программу kafka-topics.sh.

  • Чтобы создать раздел, в сеансе SSH-подключения выполните следующую команду.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    

    Эта команда создает подключение к Zookeeper, используя хранящиеся в $KAFKAZKHOSTS сведения об узле. Затем она создает раздел Kafka с именем test.

    • Данные, хранящиеся в этом разделе, разделены на восемь секций.

    • Каждый раздел реплицируется на три узла в кластере.

      Если кластер был создан в регионе Azure, который предоставляет три домена сбоя, то следует использовать коэффициент репликации, равный 3. В противном случае следует использовать коэффициент репликации, равный 4.

      В регионах с тремя доменами сбоя коэффициент репликации, равный 3, обеспечивает распределение реплик по доменам сбоя. В регионах с двумя доменами сбоя коэффициент репликации, равный 4, обеспечивает равномерное распределение реплик по доменам сбоя.

      Для получения информации о количестве доменов сбоя в регионе см. документ Доступность виртуальных машин Linux.

      В Kafka нет сведений о доменах сбоя Azure. При создании реплик разделов для тем они могут быть неправильно распределены, что снижает их высокую доступность.

      Для обеспечения высокого уровня доступности используйте средство перераспределения секций Apache Kafka. Этот инструмент следует запускать из сеанса SSH-подключения на головном узле кластера Kafka.

      Чтобы обеспечить максимально высокий уровень доступности данных Kafka, следует выполнить перераспределение реплик разделов для вашей темы в следующих случаях:

      • Вы создаете новую тему или раздел

      • Вы увеличиваете масштаб кластера.

  • Чтобы вывести список разделов, введите следующую команду.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Она выводит список разделов на кластере Kafka.

  • Чтобы удалить раздел, используйте следующую команду.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
    

    Эта команда удаляет тему topicname.

    Предупреждение

    Если вы уже удалили раздел test, то необходимо создать его заново. Он используется в последующих действиях в этом документе.

Чтобы получить дополнительные сведения о командах, доступных в служебной программе kafka-topics.sh, используйте следующую команду.

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh

Создание и использование записей

В Kafka записи хранятся в разделах. Записи создаются производителями, а используются потребителями. Производители и потребители взаимодействуют со службой брокера Kafka. Каждый рабочий узел в кластере HDInsight — это узел брокера Kafka.

Чтобы сохранить записи в созданный ранее раздел test, а затем считать их с помощью потребителя, сделайте следующее:

  1. Чтобы записать записи в раздел, используйте служебную программу kafka-console-producer.sh из сеанса SSH-подключения.

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
    

    После выполнения этой команды вы перейдете к пустой строке.

  2. Введите текстовое сообщение в пустую строку и нажмите клавишу ВВОД. Введите несколько таких сообщений, а затем нажмите клавиши CTRL+C, чтобы вернуться к обычной командной строке. Каждая строка отправляется в раздел Kafka как отдельная запись.

  3. Чтобы считать записи из раздела, используйте служебную программу kafka-console-consumer.sh из сеанса SSH-подключения.

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
    

    Эта команда извлекает записи из раздела, а затем отображает их. Параметр --from-beginning указывает потребителю считывать данные с самого начала потока, поэтому будут извлечены все записи.

    Если вы используете более раннюю версию Kafka, замените --bootstrap-server $KAFKABROKERS на --zookeeper $KAFKAZKHOSTS.

  4. Нажмите клавиши Ctrl+C, чтобы остановить процесс.

Вы также можете программно создавать производителей и потребителей. Пример использования этого API см. в руководстве Использование API производителя и потребителя Apache Kafka.

Очистка ресурсов

Вы можете удалить ненужную группу ресурсов, кластер HDInsight и все связанные с ним ресурсы, выполнив команду Remove-AzResourceGroup.

Remove-AzResourceGroup -Name $resourceGroup

Предупреждение

Выставление счетов для кластера HDInsight начинается сразу после его создания и прекращается при удалении кластера. Платежи рассчитываются поминутно, поэтому всегда следует удалять кластер, когда он больше не нужен.

При удалении кластера Kafka в HDInsight удаляются все данные, хранящиеся в Kafka.

Следующие шаги