Apache Spark uygulamanızı Azure Event Hubs bağlama

Bu öğretici, gerçek zamanlı akış için Spark uygulamanızı Event Hubs'a bağlama işleminde size yol gösterir. Bu tümleştirme, protokol istemcilerinizi değiştirmek veya kendi Kafka veya Zookeeper kümelerinizi çalıştırmak zorunda kalmadan akışa olanak tanır. Bu öğretici için Apache Spark v2.4+ ve Apache Kafka v2.0+ gerekir.

Not

Bu örnek GitHub'da kullanılabilir

Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:

  • Event Hubs ad alanı oluşturma
  • Örnek projeyi kopyalama
  • Spark'ı çalıştırma
  • Kafka için Event Hubs'dan okuma
  • Kafka için Event Hubs'a yazma

Önkoşullar

Bu öğreticiye başlamadan önce şunlara sahip olduğunuzdan emin olun:

Not

Spark-Kafka bağdaştırıcısı Spark v2.4'ten itibaren Kafka v2.0'ı destekleyecek şekilde güncelleştirildi. Spark'ın önceki sürümlerinde, bağdaştırıcı Kafka v0.10 ve üstünü destekliyordu ama özel olarak Kafka v0.10 API'lerine dayanıyordu. Kafka için Event Hubs Kafka v0.10'u desteklemediğinden, Spark'ın v2.4'ten önceki sürümlerinden Spark-Kafka bağdaştırıcıları Kafka için Event Hubs Ekosistemlerinde desteklenmez.

Event Hubs ad alanı oluşturma

Herhangi bir Event Hubs hizmetinden göndermek ve almak için Event Hubs ad alanı gereklidir. Ad alanı ve olay hub'ı oluşturma yönergeleri için bkz. Olay hub'ı oluşturma. Daha sonra kullanmak üzere Event Hubs bağlantı dizesini ve tam etki alanı adını (FQDN) alın. Yönergeler için bkz. Event Hubs bağlantı dizesi alma.

Örnek projeyi kopyalama

Azure Event Hubs deposunu kopyalayın ve tutorials/spark alt klasörüne gidin:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Kafka için Event Hubs'dan okuma

Birkaç yapılandırma değişikliğiyle, Kafka için Event Hubs'dan okumaya başlayabilirsiniz. BOOTSTRAP_SERVERS ve EH_SASL öğelerini ad alanınızdan gelen ayrıntılarla güncelleştirin. Bundan sonra aynı Kafka'yla yaptığınız gibi Event Hubs ile akışı başlatabilirsiniz. Örnek kodun tamamı için GitHub'da sparkConsumer.scala dosyasına bakın.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Aşağıdakine benzer bir hata alırsanız, çağrısına spark.readStream ekleyin .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") ve yeniden deneyin.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Kafka için Event Hubs'a yazma

Kafka'ya yazdığınız gibi Event Hubs'a da yazabilirsiniz. Yapılandırmanızı güncelleştirip BOOTSTRAP_SERVERS ve EH_SASL öğelerini Event Hubs ad alanınızdan gelen bilgilerle değiştirmeyi unutmayın. Örnek kodun tamamı için GitHub'da sparkProducer.scala dosyasına bakın.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Sonraki adımlar

Kafka için Event Hubs ve Event Hubs hakkında daha fazla bilgi edinmek için aşağıdaki makalelere bakın: