Подключение приложения Apache Spark с помощью Центров событий Azure
В этом руководстве описывается, как подключить приложение Spark к Центрам событий для потоковой передачи в режиме реального времени. Такая интеграция обеспечивает потоковую передачу без необходимости изменять клиенты протокола или запускать собственные кластеры Kafka или Zookeeper. Для работы с этим руководством требуется Apache Spark v2.4+ и Apache Kafka v2.0+.
Примечание
Этот пример можно найти на сайте GitHub.
В этом руководстве описано следующее:
- Создание пространства имен в Центрах событий
- Клонирование примера проекта
- Запуск Spark.
- Чтение из Центров событий для Kafka.
- Запись в Центры событий для Kafka.
Предварительные требования
Перед началом работы с данным руководством необходимо выполнить описанные ниже условия.
- Подписка Azure. Если ее нет, создайте бесплатную учетную запись.
- Apache Spark v2.4.
- Apache Kafka v2.0.
- Git;
Примечание
Адаптер Spark-Kafka был обновлен для поддержки Kafka v2.0, начиная со Spark v2.4. В предыдущих выпусках Spark адаптер поддерживал Kafka v0.10 и более поздние версии, но в основном как база использовались API Kafka v0.10. Так как Центры событий для Kafka не поддерживают Kafka v0.10, адаптеры Spark-Kafka от версий Spark до версии 2.4 не поддерживаются Центрами событий для экосистем Kafka.
Создание пространства имен Центров событий
Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Центров событий и полное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).
Клонирование примера проекта
Клонируйте репозиторий Центров событий Azure и перейдите к вложенной папке tutorials/spark
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Чтение из Центров событий для Kafka.
Внеся несколько изменений в конфигурации, вы сможете запустить чтение из Центров событий для Kafka. Обновите BOOTSTRAP_SERVERS и EH_SASL сведениями о пространстве имен, чтобы запустить потоковую передачу с помощью Центров событий так же, как с Kafka. Чтобы ознакомиться с полным примером кода, просмотрите файл sparkConsumer.scala на сайте GitHub.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Если появится сообщение об ошибке, аналогичное приведенной ниже, добавьте .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
в вызов и повторите попытку spark.readStream
.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Запись в Центры событий для Kafka.
Вы также можете выполнить запись в Центры событий таким же образом, как и в Kafka. Обновите конфигурацию, чтобы заменить сведения в BOOTSTRAP_SERVERS и EH_SASL сведениями о пространстве имен в Центре событий. Чтобы ознакомиться с полным примером кода, просмотрите файл sparkProducer.scala на сайте GitHub.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Дальнейшие действия
Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях:
- Зеркальное отображение брокера Kafka в концентраторе событий
- Подключение Apache Flink к концентратору событий
- Интеграция Kafka Connect с концентратором событий
- Migrating to Azure Event Hubs for Apache Kafka Ecosystems (Переход в Центры событий Azure для экосистем Apache Kafka)
- Подключение Akka Streams к концентратору событий
- Руководство разработчика Apache Kafka для концентраторов событий Azure