Обучение
Модуль
Расширенное преобразование потоковых данных с помощью Apache Spark и Kafka в Azure HDInsight
Этот браузер больше не поддерживается.
Выполните обновление до Microsoft Edge, чтобы воспользоваться новейшими функциями, обновлениями для системы безопасности и технической поддержкой.
Узнайте, как реплицировать разделы во вторичный кластер с помощью функции зеркального отображения Apache Kafka. Зеркальное отображение можно выполнять как непрерывный процесс или периодически для переноса данных из одного кластера в другой.
В этой статье зеркальное отображение используется для репликации разделов между двумя кластерами HDInsight. Эти кластеры находятся в разных виртуальных сетях и в разных центрах обработки данных.
Предупреждение
Не используйте зеркальное отображение как средство для достижения отказоустойчивости. Из-за разного смещения элементов в разделах основного и вторичного кластеров они не могут использоваться как взаимозаменяемые. Если вас интересует отказоустойчивость, настройте репликацию для разделов в пределах кластера. Дополнительные сведения см. в статье по началу работы с Apache Kafka в HDInsight.
Зеркальное отображение работает с помощью средства MirrorMaker, которое является частью Apache Kafka. MirrorMaker использует записи из разделов в основном кластере, а затем создает локальную копию в дополнительном кластере. MirrorMaker использует один получатель (или несколько), которые считывают основной кластер, и производитель, который записывает данные в локальный (вторичный) кластер.
В наиболее полезной настройке зеркального отображения для аварийного восстановления используются кластеры Kafka в разных регионах Azure. Для этого виртуальные сети, в которых находятся кластеры, находятся в одноранговом узле.
На следующей схеме показан процесс зеркального отображения и обмен данными между кластерами.
Основной и вторичный кластеры могут отличаться количеством узлов и секций. Смещения в темах также отличаются. Зеркальное отображение сохраняет значение ключа, который используется для секционирования, поэтому порядок записей сохраняется вместе с этими значениями.
Чтобы реализовать зеркальное отображение между кластерами Kafka в разных сетях, следует учитывать следующее.
Шлюзы: сети должны взаимодействовать на уровне TCP/IP.
Серверная адресация: можно выбрать адреса узлов кластера, используя их IP-адреса или полные доменные имена.
IP-адреса: если вы настраиваете кластеры Kafka для использования объявления IP-адресов, продолжить установку зеркального отображения можно, используя IP-адреса узлов брокера и узлов ZooKeeper.
Доменные имена: если вы не настраиваете кластеры Kafka для объявления IP-адресов, кластеры должны иметь возможность подключаться друг к другу с помощью полных доменных имен (FQDN). Потребуется установить в каждой сети сервер службы доменных имен (DNS), который настроен для пересылки запросов в другие сети. При создании виртуальной сети Azure вместо использования автоматического DNS, предоставленного сетью, необходимо указать пользовательские DNS-сервер и IP-адрес для сервера. После создания виртуальной сети необходимо создать виртуальную машину Azure, которая использует этот IP-адрес. Затем на ней устанавливается и настраивается программное обеспечение DNS.
Важно!
Создайте и настройте пользовательский DNS-сервер, прежде чем устанавливать HDInsight в виртуальной сети. HDInsight не нужно дополнительно настраивать для использования DNS-сервера, настроенного для виртуальной сети.
Дополнительные сведения о подключении двух виртуальных сетей Azure см. в статье Настройка подключения.
Эта архитектура включает в себя два кластера в разных группах ресурсов и виртуальных сетях: первичный и вторичный.
Создание двух новых групп ресурсов
Группа ресурсов | Расположение |
---|---|
kafka-primary-rg | Центральная часть США |
kafka-secondary-rg | Центрально-северная часть США |
Создайте новую виртуальную сеть kafka-primary-vnet в расположении kafka-primary-rg. Оставьте параметры по умолчанию.
Создайте новую виртуальную сеть kafka-secondary-vnet в расположении kafka-secondary-rg, также с параметрами по умолчанию.
Создайте два новых кластера Kafka:
Имя кластера | Группа ресурсов | Виртуальная сеть | Учетная запись хранения |
---|---|---|---|
kafka-primary-cluster | kafka-primary-rg | kafka-primary-vnet | kafkaprimarystorage |
kafka-secondary-cluster | kafka-secondary-rg | kafka-secondary-vnet | kafkasecondarystorage |
Создайте пиринг между виртуальными сетями. На этом шаге будет создано два пиринга: один из kafka-primary-vnet в kafka-secondary-vnet и один наоборот из kafka-secondary-vnet в kafka-primary-vnet.
Выберите виртуальную сеть kafka-primary-vnet.
В разделе Параметры щелкните Пиринги.
Выберите Добавить.
На экране Добавить пиринг введите сведения, как показано на снимке экрана ниже.
Настройте объявления IP-адресов, чтобы разрешить клиенту подключаться по IP-адресам брокера вместо доменных имен.
Перейдите на панель мониторинга Ambari для основного кластера: https://PRIMARYCLUSTERNAME.azurehdinsight.net
.
Выберите Службы>Kafka. Выберите вкладку Конфигурации .
Добавьте следующие строки конфигурации в нижнюю секцию шаблон kafka-env. Щелкните Сохранить.
# Configure Kafka to advertise IP addresses instead of FQDN
IP_ADDRESS=$(hostname -i)
echo advertised.listeners=$IP_ADDRESS
sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
Введите примечание на экране Сохранения конфигурации и выберите Сохранить.
Если вы получаете предупреждение о конфигурации, выберите Все равно продолжить.
В области Сохранить изменения конфигурации выберите OK.
У уведомлении Требуется перезагрузка выберите Перезапустить>Перезапустить все затронутое. Нажмите Подтвердить перезапуск всех.
PLAINTEXT://0.0.0.0:9092
.Выберите Узлы на панели мониторинга Ambari.
Запишите IP-адреса для брокера и ZooKeeper. Узлы брокера имеют wn в качестве первых двух букв имени узла, а узлы ZooKeeper имеют zk в качестве первых двух букв имени узла.
Повторите предыдущие три шага для второго кластера kafka-secondary-cluster. Настройте объявления IP-адресов, установите прослушиватели, а также запишите IP-адреса брокера иZooKeeper.
Подключитесь к основному кластеру с помощью SSH.
ssh sshuser@PRIMARYCLUSTER-ssh.azurehdinsight.net
Замените sshuser
именем пользователя SSH, которое использовалось при создании кластера. Замените PRIMARYCLUSTER
базовым именем, которое использовалось при создании кластера.
Дополнительные сведения см. в статье Использование SSH с Hadoop на основе Linux в HDInsight из Linux, Unix или OS X.
Используйте следующую команду, чтобы создать две переменные среды с узлами Apache ZooKeeper и брокера для основного кластера. Замените такие строки, как ZOOKEEPER_IP_ADDRESS1
, фактическими IP-адресами, записанными ранее, например 10.23.0.11
и 10.23.0.7
. То же самое касается BROKER_IP_ADDRESS1
. Если вы используете разрешение полного доменного имени для настраиваемого DNS-сервера, выполните указанные далее действия, чтобы получить имена брокера и ZooKeeper.
# get the ZooKeeper hosts for the primary cluster
export PRIMARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181, ZOOKEEPER_IP_ADDRESS2:2181, ZOOKEEPER_IP_ADDRESS3:2181'
# get the broker hosts for the primary cluster
export PRIMARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
Создайте раздел с именем testtopic
с помощью следующей команды:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $PRIMARY_ZKHOSTS
С помощью этой команды проверьте, создан ли раздел:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $PRIMARY_ZKHOSTS
Ответ содержит testtopic
.
Чтобы просмотреть сведения об узле брокера для этого (основного) кластера, выполните следующую команду:
echo $PRIMARY_BROKERHOSTS
Эта команда возвращает следующую информацию:
10.23.0.11:9092,10.23.0.7:9092,10.23.0.9:9092
Сохраните эти сведения. Они будут использоваться в следующем разделе.
Подключитесь к вторичному кластеру с помощью другого сеанса SSH:
ssh sshuser@SECONDARYCLUSTER-ssh.azurehdinsight.net
Замените sshuser
именем пользователя SSH, которое использовалось при создании кластера. Замените SECONDARYCLUSTER
именем, которое использовалось при создании кластера.
Дополнительные сведения см. в статье Использование SSH с Hadoop на основе Linux в HDInsight из Linux, Unix или OS X.
Используйте файл consumer.properties
для настройки обмена данными с основным кластером. Чтобы создать файл, используйте следующую команду:
nano consumer.properties
Добавьте в файл consumer.properties
следующее содержимое:
bootstrap.servers=PRIMARY_BROKERHOSTS
group.id=mirrorgroup
Замените PRIMARY_BROKERHOSTS
на IP-адрес узла брокера из основного кластера.
Этот файл содержит сведения о получателе, которые используются при считывании данных из основного кластера Kafka. Дополнительные сведения см. в статье Пользовательская конфигурация в kafka.apache.org
.
Чтобы сохранить файл, нажмите Ctrl+X, нажмите Y, а затем нажмите Enter.
Перед настройкой производителя, который обменивается данными с дополнительным кластером, настройте переменную для IP-адресов брокера вторичного кластера. Для создания этой переменной используйте следующие команды:
export SECONDARY_BROKERHOSTS='BROKER_IP_ADDRESS1:9092,BROKER_IP_ADDRESS2:9092,BROKER_IP_ADDRESS2:9092'
Команда echo $SECONDARY_BROKERHOSTS
возвращает информацию приблизительно следующего содержания:
10.23.0.14:9092,10.23.0.4:9092,10.23.0.12:9092
Используйте файл producer.properties
для обмена данными со вторичным кластером. Чтобы создать файл, используйте следующую команду:
nano producer.properties
Добавьте в файл producer.properties
следующее содержимое:
bootstrap.servers=SECONDARY_BROKERHOSTS
compression.type=none
Замените SECONDARY_BROKERHOSTS
IP-адресами брокера, которые использовались на предыдущем шаге.
Дополнительные сведения см. в статье Конфигурация производителя в kafka.apache.org
.
Используйте следующие команды, чтобы создать переменную среды с IP-адресами узлов ZooKeeper для вторичного кластера:
# get the ZooKeeper hosts for the secondary cluster
export SECONDARY_ZKHOSTS='ZOOKEEPER_IP_ADDRESS1:2181,ZOOKEEPER_IP_ADDRESS2:2181,ZOOKEEPER_IP_ADDRESS3:2181'
В конфигурации по умолчанию для Kafka в HDInsight не предусматривается автоматическое создание разделов. Прежде чем начать процесс зеркального отображения, используйте один из следующих параметров:
Создание разделов на вторичном кластере. Этот параметр также позволяет задать число секций и коэффициент репликации.
Вы можете создать разделы заранее, выполнив следующую команду:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SECONDARY_ZKHOSTS
Замените testtopic
именем создаваемого раздела.
Настройте кластер на автоматическое создание тем. Этот параметр позволяет MirrorMaker автоматически создавать темы. Обратите внимание, что он может создавать их с другим количеством разделов или с другим фактором репликации, отличным от основной темы.
Чтобы настроить автоматическое создание разделов во вторичном кластере, выполните следующие действия.
https://SECONDARYCLUSTERNAME.azurehdinsight.net
.auto.create
. Будет отфильтрован список свойств и отобразится параметр auto.create.topics.enable
.auto.create.topics.enable
на true
и выберите Сохранить. Добавьте заметку и выберите Сохранить еще раз.
Примечание
В этой статье содержатся ссылки на термин, который корпорация Майкрософт больше не использует. Когда этот термин будет удален из программного обеспечения, мы удалим его из статьи.
Чтобы запустить процесс MirrorMaker, выполните следующие команды в рамках SSH-подключения к вторичному кластеру:
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
В этом примере используются следующие параметры.
Параметр | Описание |
---|---|
--consumer.config |
указывает файл, который содержит свойства получателя. Эти свойства используются для создания получателя, который считывает данные из основного кластера Kafka. |
--producer.config |
указывает файл, который содержит свойства производителя. Эти свойства используются для создания производителя, который записывает данные во вторичный кластер Kafka. |
--whitelist |
Список разделов, которые MirrorMaker реплицирует из основного кластера во вторичный. |
--num.streams |
число создаваемых потоков получателя. |
Потребитель на вторичном узле ожидает получения сообщений.
Чтобы запустить производитель и отправить сообщения в раздел, выполните следующую команду из SSH-подключения к основному кластеру:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $PRIMARY_BROKERHOSTS --topic testtopic
При переходе в пустую строку с курсором введите несколько текстовых сообщений. Сообщения отправляются в раздел в основном кластере. По завершении нажмите клавиши Ctrl+C, чтобы завершить процесс производителя.
Чтобы завершить процесс MirrorMaker, нажмите клавиши Ctrl+C в рамках SSH-подключения к вторичному кластеру. Завершение этого процесса может занять несколько секунд. Чтобы проверить состояние репликации сообщений во вторичный кластер, используйте следующую команду:
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $SECONDARY_BROKERHOSTS --topic testtopic --from-beginning
Теперь в списке разделов есть раздел testtopic
, который создается, когда MirrorMaster зеркально отражает раздел из основного кластера во вторичный. Сообщения, полученные из раздела, совпадают с введенными в основном кластере.
Предупреждение
Счета за кластеры HDInsight выставляются пропорционально в минутах, независимо от их использования. Обязательно удалите кластер, когда завершите его использование. Дополнительные сведения см. в статье Удаление кластера HDInsight с помощью браузера, PowerShell или классического интерфейса Azure CLI.
Действия, описанные в этом документе, создают кластеры в разных группах ресурсов Azure. Чтобы удалить все созданные ресурсы, можно удалить две созданные группы ресурсов: kafka-primary-rg и kafka-secondary-rg. При удалении групп ресурсов удаляются все ресурсы, созданные в следующем документе, включая кластеры, виртуальные сети и учетные записи хранения.
Из этого документа вы узнали, как создать реплику кластера Apache Kafka с помощью MirrorMaker. Другие материалы, посвященные работе с Kafka, доступны по следующим ссылкам:
Обучение
Модуль
Расширенное преобразование потоковых данных с помощью Apache Spark и Kafka в Azure HDInsight
Документация
Перенос рабочих нагрузок Apache Kafka в Azure HDInsight 5.1
Узнайте, как перенести рабочие нагрузки Apache Kafka в HDInsight 4.0 в HDInsight 5.1.
Прокси-сервер REST для Apache Kafka (Azure HDInsight)
Сведения о том, как выполнять в Azure HDInsight операции Apache Kafka с помощью прокси-сервера REST для Kafka.
Защита Spark и Kafka — сценарий интеграции потоковой передачи Spark — Azure HDInsight
Узнайте, как защитить интеграцию потоковой передачи Spark и Kafka.