Ligar a aplicação Apache Spark ao Hubs de Eventos do Azure

Este tutorial orienta-o ao longo da ligação da sua aplicação Spark aos Hubs de Eventos para transmissão em fluxo em tempo real. Esta integração permite a transmissão em fluxo sem ter de alterar os seus clientes de protocolo ou executar os seus próprios clusters kafka ou Zookeeper. Este tutorial requer o Apache Spark v2.4+ e o Apache Kafka v2.0+.

Nota

Este exemplo está disponível no GitHub

Neste tutorial, ficará a saber como:

  • Criar um espaço de nomes dos Hubs de Eventos
  • Clonar o projeto de exemplo
  • Executar o Spark
  • Ler a partir Hubs de Eventos para Kafka
  • Escrever nos Hubs de Eventos para Kafka

Pré-requisitos

Antes de começar o tutorial, confirme que tem:

Nota

O adaptador Spark-Kafka foi atualizado para suportar o Kafka 2.0 a partir do Spark v2.4. Nas versões anteriores do Spark, o adaptador suportava o Kafka v0.10 e posteriores, mas dependia especificamente das APIs do Kafka v0.10. Uma vez que os Hubs de Eventos para Kafka não suportam o Kafka v0.10, os ecossistemas dos Hubs de Eventos para Kafka não suportam os adaptadores Spark-Kafka de versões do Spark anteriores à v2.4.

Criar um espaço de nomes dos Hubs de Eventos

É necessário um espaço de nomes dos Hubs de Eventos para enviar e receber a partir de qualquer serviços dos Hubs de Eventos. Veja Criar um hub de eventos para obter instruções para criar um espaço de nomes e um hub de eventos. Obtenha a cadeia de ligação dos Hubs de Eventos e o nome de domínio completamente qualificado (FQDN), para utilizar mais tarde. Para obter instruções, veja Get an Event Hubs connection string (Obter uma cadeia de ligação dos Hubs de Eventos).

Clonar o projeto de exemplo

Clone o repositório dos Hubs de Eventos do Azure e navegue para a subpasta tutorials/spark:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

Ler a partir Hubs de Eventos para Kafka

Com poucas alterações à configuração, pode começar a ler a partir dos Hubs de Eventos para Kafka. Atualize BOOTSTRAP_SERVERS e EH_SASL com detalhes do espaço de nomes e pode começar a transmitir em fluxo com os Hubs de Eventos tal como faria com o Kafka. Para obter o código de exemplo completo, veja o ficheiro sparkConsumer.scala no GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

Se receber um erro semelhante ao seguinte erro, adicione .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") à spark.readStream chamada e tente novamente.

IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets 

Escrever nos Hubs de Eventos para Kafka

Também pode escrever nos Hubs de Eventos da mesma forma que escreve no Kafka. Não se esqueça de atualizar a configuração para alterar BOOTSTRAP_SERVERS e EH_SASL com as informações do espaço de nomes dos Hubs de Eventos. Para obter o código de exemplo completo, veja o ficheiro sparkProducer.scala no GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

Passos seguintes

Para saber mais sobre os Hubs de Eventos e os Hubs de Eventos para Kafka, veja os seguintes artigos: