Интеграция поддержки Apache Kafka Connect в Центрах событий Azure
Apache Kafka Connect — это платформа для подключения, импорта и экспорта данных в любую внешнюю систему или из нее, например MySQL, HDFS и файловую систему, через кластер Kafka. В этой статье описывается использование платформы Kafka Connect с центрами событий.
В этой статье описывается интеграция Kafka Connect с концентратором событий и развертывание базовых FileStreamSource
и FileStreamSink
соединителей. Хотя эти соединители не предназначены для использования в рабочей среде, они демонстрируют комплексный сценарий Kafka Connect, где Центры событий Azure выступает в качестве брокера Kafka.
Примечание.
Этот пример можно найти на сайте GitHub.
Необходимые компоненты
Для работы с этим пошаговым руководством выполните следующие предварительные требования:
- Подписка Azure. Если ее нет, создайте бесплатную учетную запись.
- Git
- Linux/MacOS
- Последняя версия Kafka доступна из kafka.apache.org
- Ознакомьтесь со статьей Центры событий Azure для Apache Kafka (предварительная версия).
Создание пространства имен в Центрах событий Azure
Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Центров событий и полное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).
Клонирование примера проекта
Клонируйте репозиторий Центров событий Azure и перейдите к руководствам или вложенной папке подключения:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Настройка Kafka Connect для Центров событий
Минимальная перенастройка необходима при перенаправлении пропускной способности Kafka в Центры событий. Следующий пример connect-distributed.properties
показывает, как настроить Connect для проверки подлинности и обмена данных с конечной точкой Kafka в Центрах событий:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Внимание
Замените {YOUR.EVENTHUBS.CONNECTION.STRING}
строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Выполнение Kafka Connect
На этом этапе рабочая роль Kafka Connect запускается локально в распределенном режиме с помощью Центров событий для поддержания состояния кластера.
- Сохраните
connect-distributed.properties
файл локально. Не забудьте заменить все значения в фигурных скобках. - Перейдите к расположению выпуска Kafka на своем компьютере.
- Запустите
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. REST API рабочей роли Connect готов к взаимодействию при появлении'INFO Finished starting connectors and tasks'
.
Примечание.
Kafka Connect использует API Kafka AdminClient для автоматического создания разделов с рекомендуемыми конфигурациями, включая сжатие. Быстрая проверка пространства имен на портале Azure показывает, что внутренние разделы рабочей роли Connect были созданы автоматически.
Внутренние разделы Kafka Connect должны использовать сжатия. Команда Центров событий не несет ответственности за исправление неправильных конфигураций в случае ненадлежащей настройки внутренних разделов Connect.
Создание соединителей
В этом разделе описано, как выполнить спиннинг FileStreamSource
и FileStreamSink
соединители.
Создайте каталог для файлов входных и выходных данных.
mkdir ~/connect-quickstart
Создайте два файла: один файл с начальными данными, из которых
FileStreamSource
считывается соединитель, а другой — запись соединителяFileStreamSink
.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Создайте соединитель
FileStreamSource
. Обязательно замените фигурные скобки своим домашним каталогом.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
После выполнения команды вы увидите концентратор
connect-quickstart
событий в экземпляре Центров событий.Проверьте состояние соединителя источника.
curl -s http://localhost:8083/connectors/file-source/status
При необходимости можно использовать обозреватель служебная шина для проверки того, что события прибыли в
connect-quickstart
раздел.Создайте соединитель FileStreamSink. Еще раз убедитесь, что вы заменили фигурные скобки своим путем к домашнему каталогу.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Проверьте состояние соединителя приемника.
curl -s http://localhost:8083/connectors/file-sink/status
Убедитесь, что данные реплицированы между файлами и что данные будут одинаковыми для обоих файлов.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Очистка
Kafka Connect создает разделы Центров событий для хранения конфигураций, смещения и состояния, которые сохраняются даже после того, как кластер Connect был снят. Если эта сохраняемость не требуется, рекомендуется удалить эти разделы. Вы также можете удалить connect-quickstart
центры событий, созданные во время этого пошагового руководства.
Связанный контент
Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях:
- Руководство разработчика Apache Kafka для концентраторов событий Azure
- Migrating to Azure Event Hubs for Apache Kafka Ecosystems (Переход в Центры событий Azure для экосистем Apache Kafka)