Использование Akka Streams с Центрами событий для Apache Kafka
В этом руководстве показано, как подключить потоки Akka через поддержку концентраторов событий для Apache Kafka без изменения клиентов протокола или запуска собственных кластеров.
В этом руководстве описано следующее:
- Создание пространства имен в Центрах событий
- Клонирование примера проекта
- Запуск производителя Akka Streams
- Запуск потребителя Akka Streams
Примечание
Этот пример можно найти на сайте GitHub.
Предварительные требования
Для работы с этим руководством выполните следующие предварительные требования:
- Прочтите статью Центры событий Azure для Apache Kafka.
- Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начать работу.
-
Комплект разработчика Java (JDK) 1.8+.
- В Ubuntu выполните команду
apt-get install default-jdk
, чтобы установить JDK. - Обязательно настройте переменную среды JAVA_HOME так, чтобы она указывала на папку, в которой установлен пакет JDK.
- В Ubuntu выполните команду
-
Скачайте и установите двоичный архив Maven.
- В Ubuntu выполните команду
apt-get install maven
, чтобы установить Maven.
- В Ubuntu выполните команду
-
Git;
- В Ubuntu выполните команду
sudo apt-get install git
, чтобы установить Git.
- В Ubuntu выполните команду
Создание пространства имен Центров событий
Для отправки и получения данных из любой службы концентраторов событий требуется пространство имен концентраторов событий. См. раздел Создание концентратора событий для получения подробной информации. Скопируйте строку подключения к Центрам событий для дальнейшего использования.
Клонирование примера проекта
Теперь, когда у вас есть строка подключения концентраторов событий, клонируйте центры событий Azure для репозитория Kafka и перейдите во вложенную папку akka
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java
Запуск производителя Akka Streams
Используя предоставленный пример производителя Akka Streams, отправьте сообщения в службу Центров событий.
Предоставление конечной точки Kafka в Центрах событий
application.conf производителя
Обновите значения bootstrap.servers
и sasl.jaas.config
в producer/src/main/resources/application.conf
, чтобы перенаправить производителя на конечную точку Kafka Центров событий с правильной аутентификацией.
akka.kafka.producer {
#Akka Kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
Важно!
Замените {YOUR.EVENTHUBS.CONNECTION.STRING}
строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Запуск производителя из командной строки
Чтобы запустить производитель из командной строки, создайте JAR-файл, а затем запустите его из Maven (или создайте JAR-файл с помощью Maven, затем запустите его в Java, добавив необходимые JAR-файлы Kafka в путь к классу):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"
Производитель начинает отправлять события в концентратор событий в теме test
и печатает события в стандартный вывод.
Запуск потребителя Akka Streams
Используя предоставленный пример потребителя, получите сообщения от концентратора событий.
Предоставление конечной точки Kafka в Центрах событий
application.conf потребителя
Обновите значения bootstrap.servers
и sasl.jaas.config
в consumer/src/main/resources/application.conf
, чтобы перенаправить потребителя на конечную точку Kafka в Центрах событий с правильной аутентификацией.
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
Важно!
Замените {YOUR.EVENTHUBS.CONNECTION.STRING}
строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Запуск потребителя из командной строки
Чтобы запустить потребителя из командной строки, создайте JAR-файл, а затем запустите его из Maven (или создайте JAR-файл с помощью Maven, затем запустите его в Java, добавив необходимые JAR-файлы Kafka в путь к классу):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"
Если в концентраторе событий есть события (например, если ваш продюсер также работает), то потребитель начинает получать события из темы test
.
Дополнительные сведения об Akka Streams см. в руководстве по Akka Streams Kafka.
Дальнейшие действия
Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях:
- Зеркальное отображение брокера Kafka в концентраторе событий
- Подключение Apache Spark к концентратору событий
- Подключение Apache Flink к концентратору событий
- Интеграция Kafka Connect с концентратором событий
- Migrating to Azure Event Hubs for Apache Kafka Ecosystems (Переход в Центры событий Azure для экосистем Apache Kafka)
- Руководство разработчика Apache Kafka для концентраторов событий Azure