Udostępnij przez


Łączenie aplikacji platformy Apache Spark z usługą Azure Event Hubs

Ten samouczek przeprowadzi Cię przez proces łączenia aplikacji Spark z usługą Event Hubs na potrzeby przesyłania strumieniowego w czasie rzeczywistym. Ta integracja umożliwia przesyłanie strumieniowe bez konieczności zmiany klientów protokołu czy uruchamiania własnych klastrów platformy Kafka lub Zookeeper. Ten samouczek wymaga oprogramowania Apache Spark w wersji 2.4+ i Apache Kafka w wersji 2.0+.

Uwaga

Ten przykład jest dostępny w witrynie GitHub

Z tego samouczka dowiesz się, jak wykonywać następujące działania:

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Klonowanie projektu przykładowego
  • Uruchamianie platformy Spark
  • Odczyt z usługi Event Hubs dla platformy Kafka
  • Zapisywanie w usłudze Event Hubs dla platformy Kafka

Wymagania wstępne

Przed rozpoczęciem tego samouczka upewnij się, że masz następujące elementy:

Uwaga

Adapter Spark-Kafka został zaktualizowany do obsługi Kafka w wersji 2.0 od Spark w wersji 2.4. W poprzednich wersjach oprogramowania Spark adapter obsługiwał platformę Kafka w wersji 0.10 lub nowszej, ale używał konkretnie interfejsów API platformy Kafka w wersji 0.10. Ponieważ Event Hubs dla systemu Kafka nie obsługuje wersji Kafka 0.10, adaptery Spark-Kafka z wersji Spark wcześniejszych niż 2.4 nie są obsługiwane przez Event Hubs dla ekosystemów Kafka.

Tworzenie przestrzeni nazw usługi Event Hubs

Przestrzeń nazw Event Hubs jest wymagana do wysyłania i odbierania ze wszystkich usług Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string.

Klonowanie projektu przykładowego

Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/spark :

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Odczyt z usługi Event Hubs dla platformy Kafka

Po kilku zmianach konfiguracji możesz rozpocząć odczytywanie z usługi Event Hubs dla platformy Kafka. Zaktualizuj BOOTSTRAP_SERVERS i EH_SASL przy użyciu szczegółów z przestrzeni nazw i możesz rozpocząć przesyłanie strumieniowe za pomocą usługi Event Hubs, tak jak w przypadku platformy Kafka. Pełny przykładowy kod można znaleźć w pliku sparkConsumer.scala w witrynie GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Jeśli wystąpi błąd podobny do poniższego błędu, dodaj .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") do wywołania spark.readStream i spróbuj ponownie.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Zapisywanie w usłudze Event Hubs dla platformy Kafka

Możesz również pisać do Event Hubs w taki sam sposób, jak piszesz do Kafki. Nie zapomnij zaktualizować konfiguracji w celu zmiany BOOTSTRAP_SERVERS i EH_SASL przy użyciu informacji z przestrzeni nazw Event Hubs. Pełny przykładowy kod można znaleźć w pliku sparkProducer.scala w witrynie GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: