Apache Spark uygulamanızı Azure Event Hubs bağlama
Bu öğretici, gerçek zamanlı akış için Spark uygulamanızı Event Hubs'a bağlama işleminde size yol gösterir. Bu tümleştirme, protokol istemcilerinizi değiştirmek veya kendi Kafka veya Zookeeper kümelerinizi çalıştırmak zorunda kalmadan akışa olanak tanır. Bu öğretici için Apache Spark v2.4+ ve Apache Kafka v2.0+ gerekir.
Not
Bu örnek GitHub'da kullanılabilir
Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:
- Event Hubs ad alanı oluşturma
- Örnek projeyi kopyalama
- Spark'ı çalıştırma
- Kafka için Event Hubs'dan okuma
- Kafka için Event Hubs'a yazma
Önkoşullar
Bu öğreticiye başlamadan önce şunlara sahip olduğunuzdan emin olun:
- Azure aboneliği. Aboneliğiniz yoksa ücretsiz bir hesap oluşturun.
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
Not
Spark-Kafka bağdaştırıcısı Spark v2.4'ten itibaren Kafka v2.0'ı destekleyecek şekilde güncelleştirildi. Spark'ın önceki sürümlerinde, bağdaştırıcı Kafka v0.10 ve üstünü destekliyordu ama özel olarak Kafka v0.10 API'lerine dayanıyordu. Kafka için Event Hubs Kafka v0.10'u desteklemediğinden, Spark'ın v2.4'ten önceki sürümlerinden Spark-Kafka bağdaştırıcıları Kafka için Event Hubs Ekosistemlerinde desteklenmez.
Event Hubs ad alanı oluşturma
Herhangi bir Event Hubs hizmetinden göndermek ve almak için Event Hubs ad alanı gereklidir. Ad alanı ve olay hub'ı oluşturma yönergeleri için bkz. Olay hub'ı oluşturma. Daha sonra kullanmak üzere Event Hubs bağlantı dizesini ve tam etki alanı adını (FQDN) alın. Yönergeler için bkz. Event Hubs bağlantı dizesi alma.
Örnek projeyi kopyalama
Azure Event Hubs deposunu kopyalayın ve tutorials/spark
alt klasörüne gidin:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Kafka için Event Hubs'dan okuma
Birkaç yapılandırma değişikliğiyle, Kafka için Event Hubs'dan okumaya başlayabilirsiniz. BOOTSTRAP_SERVERS ve EH_SASL öğelerini ad alanınızdan gelen ayrıntılarla güncelleştirin. Bundan sonra aynı Kafka'yla yaptığınız gibi Event Hubs ile akışı başlatabilirsiniz. Örnek kodun tamamı için GitHub'da sparkConsumer.scala dosyasına bakın.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Aşağıdakine benzer bir hata alırsanız, çağrısına spark.readStream
ekleyin .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
ve yeniden deneyin.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Kafka için Event Hubs'a yazma
Kafka'ya yazdığınız gibi Event Hubs'a da yazabilirsiniz. Yapılandırmanızı güncelleştirip BOOTSTRAP_SERVERS ve EH_SASL öğelerini Event Hubs ad alanınızdan gelen bilgilerle değiştirmeyi unutmayın. Örnek kodun tamamı için GitHub'da sparkProducer.scala dosyasına bakın.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Sonraki adımlar
Kafka için Event Hubs ve Event Hubs hakkında daha fazla bilgi edinmek için aşağıdaki makalelere bakın: