Интеграция поддержки Apache Kafka Connect в Центрах событий Azure

Apache Kafka Connect — это платформа для подключения, импорта и экспорта данных в любую внешнюю систему или из нее, например MySQL, HDFS и файловую систему, через кластер Kafka. В этом руководстве описано использование платформы Kafka Connect с Центрами событий.

В этом руководстве описана интеграция Kafka Connect с Центрами событий и развертывание соединителей FileStreamSource и FileStreamSink. Хотя эти соединители не предназначены для использования в рабочей среде, они демонстрируют комплексный сценарий Kafka Подключение, где Центры событий Azure выступает в качестве брокера Kafka.

Примечание.

Этот пример можно найти на сайте GitHub.

При работе с этим руководством вы выполните следующие задачи:

  • Создание пространства имен в Центрах событий Azure
  • Клонирование примера проекта
  • Настройка Kafka Connect для Центров событий
  • Выполнение Kafka Connect
  • Создание соединителей

Необходимые компоненты

Для работы с этим пошаговым руководством выполните следующие предварительные требования:

Создание пространства имен в Центрах событий Azure

Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Центров событий и полное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).

Клонирование примера проекта

Клонируйте репозиторий Центров событий Azure и перейдите к руководствам или вложенной папке подключения:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Настройка Kafka Connect для Центров событий

Минимальная перенастройка необходима при перенаправлении пропускной способности Kafka в Центры событий. Следующий пример connect-distributed.properties показывает, как настроить Connect для проверки подлинности и обмена данных с конечной точкой Kafka в Центрах событий:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Внимание

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Выполнение Kafka Connect

На этом этапе рабочая роль Kafka Connect запускается локально в распределенном режиме с помощью Центров событий для поддержания состояния кластера.

  1. Сохраните файл connect-distributed.properties, указанный выше, локально. Не забудьте заменить все значения в фигурных скобках.
  2. Перейдите к расположению выпуска Kafka на своем компьютере.
  3. Запустите ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. REST API рабочей роли Connect готов к взаимодействию при появлении 'INFO Finished starting connectors and tasks'.

Примечание.

Kafka Connect использует API Kafka AdminClient для автоматического создания разделов с рекомендуемыми конфигурациями, включая сжатие. Быстрая проверка пространства имен на портале Azure показывает, что внутренние разделы рабочей роли Connect были созданы автоматически.

Внутренние разделы Kafka Connect должны использовать сжатия. Команда Центров событий не несет ответственности за исправление неправильных конфигураций в случае ненадлежащей настройки внутренних разделов Connect.

Создание соединителей

В этом разделе показано использование соединителей FileStreamSource и FileStreamSink.

  1. Создайте каталог для файлов входных и выходных данных.

    mkdir ~/connect-quickstart
    
  2. Создайте два файла. Один файл с начальными данными, которые читает соединитель FileStreamSource, и другой, в который соединитель FileStreamSink выполняет запись.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Создайте соединитель FileStreamSource. Обязательно замените фигурные скобки своим домашним каталогом.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    После выполнения приведенной выше команды вы увидите концентратор connect-quickstart событий в экземпляре Центров событий.

  4. Проверьте состояние соединителя источника.

    curl -s http://localhost:8083/connectors/file-source/status
    

    При необходимости, чтобы убедиться, что события были получены из раздела connect-quickstart, можно использовать Explorer служебной шины.

  5. Создайте соединитель FileStreamSink. Еще раз убедитесь, что вы заменили фигурные скобки своим путем к домашнему каталогу.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Проверьте состояние соединителя приемника.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Убедитесь, что данные реплицированы между файлами и что данные будут одинаковыми для обоих файлов.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Очистка

Kafka Подключение создает разделы Центров событий для хранения конфигураций, смещения и состояния, которые сохраняются даже после того, как кластер Подключение был снят. Если этого не требуется, рекомендуется удалить эти разделы. Вы также можете удалить connect-quickstart центры событий, созданные во время этого пошагового руководства.

Следующие шаги

Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях: