Использование Apache Flink с Центрами событий Azure для Apache Kafka

В этом руководстве показано, как подключить Apache Flink к концентратору событий без изменения клиентов протокола или запуска собственных кластеров. Дополнительные сведения о поддержке концентраторами событий протокола потребителя Apache Kafka см. в разделе Центры событий для Apache Kafka.

В этом руководстве описано следующее:

  • Создание пространства имен в Центрах событий
  • Клонирование примера проекта
  • Запуск производителя Flink
  • Запуск потребителя Flink

Примечание

Этот пример можно найти на сайте GitHub.

Предварительные требования

Для работы с этим руководством выполните следующие предварительные требования:

Создание пространства имен Центров событий

Для отправки и получения данных из любой службы концентраторов событий требуется пространство имен концентраторов событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Скопируйте строку подключения к Центрам событий для дальнейшего использования.

Клонирование примера проекта

Теперь, когда у вас есть строка подключения концентраторов событий, клонируйте центры событий Azure для репозитория Kafka и перейдите во вложенную папку flink:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

Используя предоставленный пример производителя Flink, отправьте сообщения в службу Центров событий.

Предоставление конечной точки Kafka в Центрах событий

producer.config

Обновите значения bootstrap.servers и sasl.jaas.config в producer/src/main/resources/producer.config, чтобы перенаправить производителя на конечную точку Kafka Центров событий с правильной аутентификацией.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Важно!

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Запуск производителя из командной строки

Чтобы запустить производитель из командной строки, создайте JAR-файл, а затем запустите его из Maven (или создайте JAR-файл с помощью Maven, затем запустите его в Java, добавив необходимые JAR-файлы Kafka в путь к классу):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

Теперь производитель начнет отправлять события в концентратор событий в теме test и печатать события в стандартный вывод.

Используя предоставленный пример потребителя, получите сообщения от концентратора событий.

Предоставление конечной точки Kafka в Центрах событий

consumer.config

Обновите значения bootstrap.servers и sasl.jaas.config в consumer/src/main/resources/consumer.config, чтобы перенаправить потребителя на конечную точку Kafka в Центрах событий с правильной аутентификацией.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

Важно!

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Запуск потребителя из командной строки

Чтобы запустить потребителя из командной строки, создайте JAR-файл, а затем запустите его из Maven (или создайте JAR-файл с помощью Maven, затем запустите его в Java, добавив необходимые JAR-файлы Kafka в путь к классу):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

Если в концентраторе событий есть события (например, если ваш продюсер также работает), то теперь потребитель начинает получать события из темы test.

Дополнительные сведения о подключении Flink к Kafka см. в руководстве по соединителю Kafka Flink.

Дальнейшие действия

Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях: