Łączenie aplikacji platformy Apache Spark z Azure Event Hubs

Ten samouczek przeprowadzi Cię przez proces łączenia aplikacji Spark z usługą Event Hubs na potrzeby przesyłania strumieniowego w czasie rzeczywistym. Ta integracja umożliwia przesyłanie strumieniowe bez konieczności zmieniania klientów protokołu lub uruchamiania własnych klastrów platformy Kafka lub zookeeper. Ten samouczek wymaga platformy Apache Spark w wersji 2.4 lub nowszej oraz platformy Apache Kafka w wersji 2.0 lub nowszej.

Uwaga

Ten przykład jest dostępny w witrynie GitHub

Ten samouczek zawiera informacje na temat wykonywania następujących czynności:

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Klonowanie projektu przykładowego
  • Uruchamianie platformy Spark
  • Odczyt z usługi Event Hubs dla platformy Kafka
  • Zapis do usługi Event Hubs dla platformy Kafka

Wymagania wstępne

Przed rozpoczęciem tego samouczka upewnij się, że masz następujące elementy:

Uwaga

Adapter Spark-Kafka został zaktualizowany do obsługi platformy Kafka w wersji 2.0 i platformy Spark od wersji 2.4. W poprzednich wersjach platformy Spark adapter obsługiwał platformę Kafka w wersji 0.10 i nowszych, ale bazował na interfejsach API platformy Kafka w wersji 0.10. Ponieważ usługa Event Hubs dla platformy Kafka nie obsługuje platformy Kafka w wersji 0.10, adaptery Spark-Kafka w wersjach platformy Spark wcześniejszych niż 2.4 nie są obsługiwane przez usługę Event Hubs dla ekosystemów platformy Kafka.

Tworzenie przestrzeni nazw usługi Event Hubs

Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Zobacz Tworzenie centrum zdarzeń, aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).

Klonowanie projektu przykładowego

Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Odczyt z usługi Event Hubs dla platformy Kafka

Po dokonaniu kilku zmian w konfiguracji możesz rozpocząć odczyt z usługi Event Hubs dla platformy Kafka. Po zaktualizowaniu zmiennych BOOTSTRAP_SERVERS i EH_SASL za pomocą szczegółowych informacji dotyczących przestrzeni nazw możesz rozpocząć przesyłanie strumieniowe z usługi Event Hubs tak samo jak z platformy Kafka. Kompletny kod przykładowy znajduje się w pliku sparkConsumer.scala w witrynie GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Jeśli wystąpi błąd podobny do następującego błędu, dodaj .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") do spark.readStream wywołania i spróbuj ponownie.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Zapis do usługi Event Hubs dla platformy Kafka

Zapis do usługi Event Hubs może również wyglądać tak samo jak zapis do platformy Kafka. Nie zapomnij zaktualizować konfiguracji, wstawiając dla zmiennych BOOTSTRAP_SERVERS i EH_SASL informacje z przestrzeni nazw usługi Event Hubs. Kompletny kod przykładowy znajduje się w pliku sparkProducer.scala w witrynie GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: