Репликация разделов Apache Kafka с помощью Kafka в HDInsight и MirrorMaker

Узнайте, как реплицировать разделы во вторичный кластер с помощью функции зеркального отображения Apache Kafka. Зеркальное отображение можно выполнять как непрерывный процесс или периодически для переноса данных из одного кластера в другой.

В этой статье зеркальное отображение используется для репликации разделов между двумя кластерами HDInsight. Эти кластеры находятся в разных виртуальных сетях и в разных центрах обработки данных.

Предупреждение

Не используйте зеркальное отображение как средство для достижения отказоустойчивости. Из-за разного смещения элементов в разделах основного и вторичного кластеров они не могут использоваться как взаимозаменяемые. Если вас интересует отказоустойчивость, настройте репликацию для разделов в пределах кластера. Дополнительные сведения см. в статье по началу работы с Apache Kafka в HDInsight.

Как работает зеркальное отображение Apache Kafka

Зеркальное отображение работает с помощью средства MirrorMaker, которое является частью Apache Kafka. MirrorMaker использует записи из разделов в основном кластере, а затем создает локальную копию в дополнительном кластере. MirrorMaker использует один получатель (или несколько), которые считывают основной кластер, и производитель, который записывает данные в локальный (вторичный) кластер.

В наиболее полезной настройке зеркального отображения для аварийного восстановления используются кластеры Kafka в разных регионах Azure. Для этого виртуальные сети, в которых находятся кластеры, находятся в одноранговом узле.

На следующей схеме показан процесс зеркального отображения и обмен данными между кластерами.

Схема процесса зеркального отображения.

Основной и вторичный кластеры могут отличаться количеством узлов и секций. Смещения в темах также отличаются. Зеркальное отображение сохраняет значение ключа, который используется для секционирования, поэтому порядок записей сохраняется вместе с этими значениями.

Зеркальное отображение в пределах сети

Чтобы реализовать зеркальное отображение между кластерами Kafka в разных сетях, следует учитывать следующее.

  • Шлюзы: сети должны взаимодействовать на уровне TCP/IP.

  • Серверная адресация: можно выбрать адреса узлов кластера, используя их IP-адреса или полные доменные имена.

    • IP-адреса: если вы настраиваете кластеры Kafka для использования объявления IP-адресов, продолжить установку зеркального отображения можно, используя IP-адреса узлов брокера и узлов ZooKeeper.

    • Доменные имена: если вы не настраиваете кластеры Kafka для объявления IP-адресов, кластеры должны иметь возможность подключаться друг к другу с помощью полных доменных имен (FQDN). Потребуется установить в каждой сети сервер службы доменных имен (DNS), который настроен для пересылки запросов в другие сети. При создании виртуальной сети Azure вместо использования автоматического DNS, предоставленного сетью, необходимо указать пользовательские DNS-сервер и IP-адрес для сервера. После создания виртуальной сети необходимо создать виртуальную машину Azure, которая использует этот IP-адрес. Затем на ней устанавливается и настраивается программное обеспечение DNS.

    Важно!

    Создайте и настройте пользовательский DNS-сервер, прежде чем устанавливать HDInsight в виртуальной сети. HDInsight не нужно дополнительно настраивать для использования DNS-сервера, настроенного для виртуальной сети.

Дополнительные сведения о подключении двух виртуальных сетей Azure см. в статье Настройка подключения.

Зеркальное отображение архитектуры

Эта архитектура включает в себя два кластера в разных группах ресурсов и виртуальных сетях: первичный и вторичный.

Этапы создания

  1. Создание двух новых групп ресурсов

    Группа ресурсов Расположение
    kafka-primary-rg Центральная часть США
    kafka-secondary-rg Центрально-северная часть США
  2. Создайте новую виртуальную сеть kafka-primary-vnet в расположении kafka-primary-rg. Оставьте параметры по умолчанию.

  3. Создайте новую виртуальную сеть kafka-secondary-vnet в расположении kafka-secondary-rg, также с параметрами по умолчанию.

  4. Создайте два новых кластера Kafka:

    Имя кластера Группа ресурсов Виртуальная сеть Учетная запись хранения
    kafka-primary-cluster kafka-primary-rg kafka-primary-vnet kafkaprimarystorage
    kafka-secondary-cluster kafka-secondary-rg kafka-secondary-vnet kafkasecondarystorage
  5. Создайте пиринг между виртуальными сетями. На этом шаге будет создано два пиринга: один из kafka-primary-vnet в kafka-secondary-vnet и один наоборот из kafka-secondary-vnet в kafka-primary-vnet.

    1. Выберите виртуальную сеть kafka-primary-vnet.

    2. В разделе Параметры щелкните Пиринги.

    3. Выберите Добавить.

    4. На экране Добавить пиринг введите сведения, как показано на снимке экрана ниже.

      Снимок экрана: добавление пиринга между виртуальными сетями в HD Insight Kafka.

Настройка объявления IP-адресов

Настройте объявления IP-адресов, чтобы разрешить клиенту подключаться по IP-адресам брокера вместо доменных имен.

  1. Перейдите на панель мониторинга Ambari для основного кластера: https://PRIMARYCLUSTERNAME.azurehdinsight.net.

  2. Выберите Службы>Kafka. Выберите вкладку Конфигурации .

  3. Добавьте следующие строки конфигурации в нижнюю секцию шаблон kafka-env. Щелкните Сохранить.

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  4. Введите примечание на экране Сохранения конфигурации и выберите Сохранить.

  5. Если вы получаете предупреждение о конфигурации, выберите Все равно продолжить.

  6. В области Сохранить изменения конфигурации выберите OK.

  7. У уведомлении Требуется перезагрузка выберите Перезапустить>Перезапустить все затронутое. Нажмите Подтвердить перезапуск всех.

    Снимок экрана: параметр Apache Ambari для перезапуска всех затронутых.

Настройте Kafka для прослушивания всех сетевых интерфейсов

  1. Оставайтесь на вкладке Конфигурации в разделе Службы>Kafka. В разделе Kafka Broker(Брокер Kafka) задайте для свойства слушатели значение PLAINTEXT://0.0.0.0:9092.
  2. Щелкните Сохранить.
  3. Выберите Перезапустить>Подтвердить перезапуск всех.

Запись IP-адреса брокера и адреса ZooKeeper для основного кластера.

  1. Выберите Узлы на панели мониторинга Ambari.

  2. Запишите IP-адреса для брокера и ZooKeeper. Узлы брокера имеют wn в качестве первых двух букв имени узла, а узлы ZooKeeper имеют zk в качестве первых двух букв имени узла.

    Снимок экрана: i p-адреса узла представления Apache Ambari.

  3. Повторите предыдущие три шага для второго кластера kafka-secondary-cluster. Настройте объявления IP-адресов, установите прослушиватели, а также запишите IP-адреса брокера иZooKeeper.

Создание разделов

  1. Подключитесь к основному кластеру с помощью SSH.

    ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
    

    Замените sshuser именем пользователя SSH, которое использовалось при создании кластера. Замените PRIMARYCLUSTER базовым именем, которое использовалось при создании кластера.

    Дополнительные сведения см. в статье Использование SSH с Hadoop на основе Linux в HDInsight из Linux, Unix или OS X.

  2. Используйте следующую команду, чтобы создать две переменные среды с узлами Apache ZooKeeper и брокера для основного кластера. Замените такие строки, как ZOOKEEPER_IP_ADDRESS1, фактическими IP-адресами, записанными ранее, например 10.23.0.11 и 10.23.0.7. То же самое касается BROKER_IP_ADDRESS1. Если вы используете разрешение полного доменного имени для настраиваемого DNS-сервера, выполните указанные далее действия, чтобы получить имена брокера и ZooKeeper.

    # get the ZooKeeper hosts for the primary cluster
    export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
    
    # get the broker hosts for the primary cluster
    export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    
  3. Создайте раздел с именем testtopic с помощью следующей команды:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
    
  4. С помощью этой команды проверьте, создан ли раздел:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
    

    Ответ содержит testtopic.

  5. Чтобы просмотреть сведения об узле брокера для этого (основного) кластера, выполните следующую команду:

    echo $PRIMARY_BROKERHOSTS
    

    Эта команда возвращает следующую информацию:

    10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092

    Сохраните эти сведения. Они будут использоваться в следующем разделе.

Настройка зеркального отображения

  1. Подключитесь к вторичному кластеру с помощью другого сеанса SSH:

    ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
    

    Замените sshuser именем пользователя SSH, которое использовалось при создании кластера. Замените SECONDARYCLUSTER именем, которое использовалось при создании кластера.

    Дополнительные сведения см. в статье Использование SSH с Hadoop на основе Linux в HDInsight из Linux, Unix или OS X.

  2. Используйте файл consumer.properties для настройки обмена данными с основным кластером. Чтобы создать файл, используйте следующую команду:

    nano consumer.properties
    

    Добавьте в файл consumer.properties следующее содержимое:

    bootstrap.servers=PRIMARY_BROKERHOSTS
    group.id=mirrorgroup
    

    Замените PRIMARY_BROKERHOSTS на IP-адрес узла брокера из основного кластера.

    Этот файл содержит сведения о получателе, которые используются при считывании данных из основного кластера Kafka. Дополнительные сведения см. в статье Пользовательская конфигурация в kafka.apache.org.

    Чтобы сохранить файл, нажмите Ctrl+X, нажмите Y, а затем нажмите Enter.

  3. Перед настройкой производителя, который обменивается данными с дополнительным кластером, настройте переменную для IP-адресов брокера вторичного кластера. Для создания этой переменной используйте следующие команды:

    export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
    

    Команда echo $SECONDARY_BROKERHOSTS возвращает информацию приблизительно следующего содержания:

    10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092

  4. Используйте файл producer.properties для обмена данными со вторичным кластером. Чтобы создать файл, используйте следующую команду:

    nano producer.properties
    

    Добавьте в файл producer.properties следующее содержимое:

    bootstrap.servers=SECONDARY_BROKERHOSTS
    compression.type=none
    

    Замените SECONDARY_BROKERHOSTS IP-адресами брокера, которые использовались на предыдущем шаге.

    Дополнительные сведения см. в статье Конфигурация производителя в kafka.apache.org.

  5. Используйте следующие команды, чтобы создать переменную среды с IP-адресами узлов ZooKeeper для вторичного кластера:

    # get the ZooKeeper hosts for the secondary cluster
    export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
    
  6. В конфигурации по умолчанию для Kafka в HDInsight не предусматривается автоматическое создание разделов. Прежде чем начать процесс зеркального отображения, используйте один из следующих параметров:

    • Создание разделов на вторичном кластере. Этот параметр также позволяет задать число секций и коэффициент репликации.

      Вы можете создать разделы заранее, выполнив следующую команду:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
      

      Замените testtopic именем создаваемого раздела.

    • Настройте кластер на автоматическое создание тем. Этот параметр позволяет MirrorMaker автоматически создавать темы. Обратите внимание, что он может создавать их с другим количеством разделов или с другим фактором репликации, отличным от основной темы.

      Чтобы настроить автоматическое создание разделов во вторичном кластере, выполните следующие действия.

      1. Перейдите на панель мониторинга Ambari для вторичного кластера: https://SECONDARYCLUSTERNAME.azurehdinsight.net.
      2. Выберите Службы>Kafka. Выберите вкладку Конфигурации.
      3. В поле Фильтр введите значение параметра auto.create. Будет отфильтрован список свойств и отобразится параметр auto.create.topics.enable.
      4. Измените значение параметра auto.create.topics.enable на true и выберите Сохранить. Добавьте заметку и выберите Сохранить еще раз.
      5. Выберите службу Kafka и щелкните Перезапустить, а затем выберите Restart all affected (Перезапустить все затронутые). Когда появится запрос, выберите Conform Restart All (Подтвердить перезапуск всех).

      Снимок экрана: включение автоматического создания разделов в службе kafka.

Запуск MirrorMaker

Примечание

В этой статье содержатся ссылки на термин, который корпорация Майкрософт больше не использует. Когда этот термин будет удален из программного обеспечения, мы удалим его из статьи.

  1. Чтобы запустить процесс MirrorMaker, выполните следующие команды в рамках SSH-подключения к вторичному кластеру:

    /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
    

    В этом примере используются следующие параметры.

    Параметр Описание
    --consumer.config указывает файл, который содержит свойства получателя. Эти свойства используются для создания получателя, который считывает данные из основного кластера Kafka.
    --producer.config указывает файл, который содержит свойства производителя. Эти свойства используются для создания производителя, который записывает данные во вторичный кластер Kafka.
    --whitelist Список разделов, которые MirrorMaker реплицирует из основного кластера во вторичный.
    --num.streams число создаваемых потоков получателя.

    Потребитель на вторичном узле ожидает получения сообщений.

  2. Чтобы запустить производитель и отправить сообщения в раздел, выполните следующую команду из SSH-подключения к основному кластеру:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
    

    При переходе в пустую строку с курсором введите несколько текстовых сообщений. Сообщения отправляются в раздел в основном кластере. По завершении нажмите клавиши Ctrl+C, чтобы завершить процесс производителя.

  3. Чтобы завершить процесс MirrorMaker, нажмите клавиши Ctrl+C в рамках SSH-подключения к вторичному кластеру. Завершение этого процесса может занять несколько секунд. Чтобы проверить состояние репликации сообщений во вторичный кластер, используйте следующую команду:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
    

    Теперь в списке разделов есть раздел testtopic, который создается, когда MirrorMaster зеркально отражает раздел из основного кластера во вторичный. Сообщения, полученные из раздела, совпадают с введенными в основном кластере.

Удаление кластера

Предупреждение

Счета за кластеры HDInsight выставляются пропорционально в минутах, независимо от их использования. Обязательно удалите кластер, когда завершите его использование. Дополнительные сведения см. в статье Удаление кластера HDInsight с помощью браузера, PowerShell или классического интерфейса Azure CLI.

Действия, описанные в этом документе, создают кластеры в разных группах ресурсов Azure. Чтобы удалить все созданные ресурсы, можно удалить две созданные группы ресурсов: kafka-primary-rg и kafka-secondary-rg. При удалении групп ресурсов удаляются все ресурсы, созданные в следующем документе, включая кластеры, виртуальные сети и учетные записи хранения.

Дальнейшие действия

Из этого документа вы узнали, как создать реплику кластера Apache Kafka с помощью MirrorMaker. Другие материалы, посвященные работе с Kafka, доступны по следующим ссылкам: