Elaborazione di flussi con Apache Kafka e Azure Databricks

Questo articolo descrive come usare Apache Kafka come origine o sink durante l'esecuzione di carichi di lavoro Structured Streaming in Azure Databricks.

Per altre informazioni su Kafka, vedere la documentazione di Kafka.

Leggere i dati da Kafka

Di seguito è riportato un esempio di streaming letto da Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks supporta anche la semantica di lettura batch per le origini dati Kafka, come illustrato nell'esempio seguente:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Per il caricamento batch incrementale, Databricks consiglia di usare Kafka con Trigger.AvailableNow. Vedere Configurazione dell'elaborazione batch incrementale.

In Databricks Runtime 13.3 LTS e versioni successive, Azure Databricks fornisce una funzione SQL per la lettura dei dati Kafka. Lo streaming con SQL è supportato solo in tabelle Live Delta o con tabelle di streaming in Databricks SQL. Vedere read_kafka funzione con valori di tabella.

Configurare il lettore Di streaming strutturato Kafka

Azure Databricks fornisce la kafka parola chiave come formato dati per configurare le connessioni a Kafka 0.10+.

Di seguito sono riportate le configurazioni più comuni per Kafka:

Esistono diversi modi per specificare gli argomenti da sottoscrivere. È consigliabile specificare solo uno di questi parametri:

Opzione Valore Descrizione
subscribe Elenco di argomenti delimitato da virgole. Elenco di argomenti a cui sottoscrivere.
subscribePattern Stringa regex Java. Modello utilizzato per sottoscrivere gli argomenti.
assign Stringa {"topicA":[0,1],"topic":[2,4]}JSON . Argomento specificoPartitions da utilizzare.

Altre configurazioni rilevanti:

Opzione Valore Valore predefinito Descrizione
kafka.bootstrap.servers Elenco delimitato da virgole di host:port. empty [Obbligatorio] Configurazione di Kafka bootstrap.servers . Se non sono presenti dati da Kafka, controllare prima l'elenco di indirizzi del broker. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. Ciò è dovuto al fatto che il client Kafka presuppone che i broker diventino disponibili alla fine e, in caso di errori di rete, riprovare per sempre.
failOnDataLoss true o false. true [Facoltativo] Indica se la query non riesce quando è possibile che i dati siano andato persi. Le query possono non riuscire a leggere in modo permanente i dati da Kafka a causa di molti scenari, ad esempio argomenti eliminati, troncamento degli argomenti prima dell'elaborazione e così via. Si tenta di stimare in modo conservativo se i dati sono stati probabilmente persi o meno. A volte questo può causare falsi allarmi. Impostare questa opzione su false se non funziona come previsto o si vuole che la query continui l'elaborazione nonostante la perdita di dati.
minPartitions Intero >= 0, 0 = disabilitato. 0 (disabilitata) [Facoltativo] Numero minimo di partizioni da leggere da Kafka. È possibile configurare Spark per usare un minimo arbitrario di partizioni da leggere da Kafka usando l'opzione minPartitions . In genere Spark ha un mapping 1-1 di argomenti KafkaPartitions alle partizioni Spark che utilizzano da Kafka. Se si imposta l'opzione minPartitions su un valore maggiore di quello dell'argomento KafkaPartitions, Spark eseguirà il riepilogo delle partizioni Kafka di grandi dimensioni su parti più piccole. Questa opzione può essere impostata in momenti di picco di caricamento, sfasamento dei dati e man mano che il flusso è in ritardo per aumentare la velocità di elaborazione. Si tratta di un costo di inizializzazione dei consumer Kafka in ogni trigger, che può influire sulle prestazioni se si usa SSL durante la connessione a Kafka.
kafka.group.id ID gruppo di consumer Kafka. non impostato [Facoltativo] ID gruppo da usare durante la lettura da Kafka. Usare questa operazione con cautela. Per impostazione predefinita, ogni query genera un ID gruppo univoco per la lettura dei dati. In questo modo ogni query ha un proprio gruppo di consumer che non presenta interferenze da nessun altro consumer e pertanto può leggere tutte le partizioni degli argomenti sottoscritti. In alcuni scenari, ad esempio l'autorizzazione basata su gruppo Kafka, è possibile usare ID gruppo autorizzati specifici per leggere i dati. Facoltativamente, è possibile impostare l'ID gruppo. Tuttavia, eseguire questa operazione con estrema cautela perché può causare comportamenti imprevisti.

* È probabile che le query in esecuzione simultanea (sia batch che streaming) con lo stesso ID gruppo interferiscano l'una con l'altra, causando la sola lettura di parte dei dati.
* Questo può verificarsi anche quando le query vengono avviate/riavviate in rapida successione. Per ridurre al minimo questi problemi, impostare la configurazione session.timeout.ms del consumer Kafka in modo che sia molto piccola.
startingOffsets prima , più recente più recente [Facoltativo] Punto iniziale all'avvio di una query, ovvero "meno recente" proveniente dai primi offset o da una stringa json che specifica un offset iniziale per ogni ArgomentoPartition. Nel file JSON, -2 come offset può essere usato per fare riferimento alla prima versione, -1 alla versione più recente. Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in json) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query e che la ripresa riprenderà sempre da dove la query è stata interrotta. Le nuove partizioni individuate durante una query inizieranno al più presto.

Per altre configurazioni facoltative, vedere Structured Streaming Kafka Integration Guide (Guida all'integrazione di Structured Streaming Kafka).

Schema per i record Kafka

Lo schema dei record Kafka è:

Column Type
key binary
value binary
argomento string
denominata int
offset long
timestamp long
timestampType int

key e value vengono sempre deserializzati come matrici di byte con .ByteArrayDeserializer Usare operazioni dataframe (ad esempio cast("string")) per deserializzare in modo esplicito le chiavi e i valori.

Scrivere dati in Kafka

Di seguito è riportato un esempio di scrittura di streaming in Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks supporta anche la semantica di scrittura batch nei sink di dati Kafka, come illustrato nell'esempio seguente:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Configurare il writer di streaming strutturato Kafka

Importante

Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della kafka-clients libreria che abilita le scritture idempotenti per impostazione predefinita. Se un sink Kafka usa la versione 2.8.0 o successiva con ACL configurati, ma senza IDEMPOTENT_WRITE abilitato, la scrittura non riesce con il messaggio org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error statedi errore .

Risolvere questo errore eseguendo l'aggiornamento a Kafka versione 2.8.0 o successiva oppure impostando .option(“kafka.enable.idempotence”, “false”) durante la configurazione del writer Structured Streaming.

Lo schema fornito a DataStreamWriter interagisce con il sink Kafka. È possibile usare i campi seguenti:

Nome colonna Obbligatorio o facoltativo Type
key facoltative STRING oppure BINARY
value Obbligatorio STRING oppure BINARY
headers facoltative ARRAY
topic facoltativo (ignorato se topic è impostato come opzione writer) STRING
partition facoltative INT

Di seguito sono riportate le opzioni comuni impostate durante la scrittura in Kafka:

Opzione Valore Valore predefinito Descrizione
kafka.boostrap.servers Elenco delimitato da virgole di <host:port> Nessuno [Obbligatorio] Configurazione di Kafka bootstrap.servers .
topic STRING non impostato [Facoltativo] Imposta l'argomento per tutte le righe da scrivere. Questa opzione esegue l'override di qualsiasi colonna di argomento presente nei dati.
includeHeaders BOOLEAN false [Facoltativo] Indica se includere le intestazioni Kafka nella riga.

Per altre configurazioni facoltative, vedere Structured Streaming Kafka Integration Guide (Guida all'integrazione di Structured Streaming Kafka).

Recuperare le metriche Kafka

È possibile ottenere la media, min e il numero massimo di offset che la query di streaming è dietro l'offset disponibile più recente tra tutti gli argomenti sottoscritti con le avgOffsetsBehindLatestmetriche , maxOffsetsBehindLateste minOffsetsBehindLatest . Vedere Lettura interattiva delle metriche.

Nota

Disponibile in Databricks Runtime 9.1 e versioni successive.

Ottenere il numero totale stimato di byte che il processo di query non ha utilizzato dagli argomenti sottoscritti esaminando il valore di estimatedTotalBytesBehindLatest. Questa stima è basata sui batch elaborati negli ultimi 300 secondi. L'intervallo di tempo su cui si basa la stima può essere modificato impostando l'opzione bytesEstimateWindowLength su un valore diverso. Ad esempio, per impostarlo su 10 minuti:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Se si esegue il flusso in un notebook, è possibile visualizzare queste metriche nella scheda Dati non elaborati nel dashboard di stato della query di streaming:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Usare SSL per connettere Azure Databricks a Kafka

Per abilitare le connessioni SSL a Kafka, seguire le istruzioni nella documentazione di Confluent Crittografia e autenticazione con SSL. È possibile specificare le configurazioni descritte, precedute da kafka., come opzioni. Ad esempio, si specifica il percorso dell'archivio attendibilità nella proprietà kafka.ssl.truststore.location.

Databricks consiglia di:

  • Archiviare i certificati nell'archiviazione di oggetti cloud. È possibile limitare l'accesso ai certificati solo ai cluster che possono accedere a Kafka. Vedere Governance dei dati con Unity Catalog.
  • Archiviare le password del certificato come segreti in un ambito segreto.

L'esempio seguente usa i percorsi di archiviazione degli oggetti e i segreti di Databricks per abilitare una connessione SSL:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Connessione Kafka in HDInsight in Azure Databricks

  1. Creare un cluster HDInsight Kafka.

    Per istruzioni, vedere Connessione a Kafka in HDInsight tramite un Rete virtuale di Azure.

  2. Configurare i broker Kafka per annunciare l'indirizzo corretto.

    Seguire le istruzioni in Configurare Kafka per la pubblicità IP. Se si gestisce Kafka in Azure Macchine virtuali, assicurarsi che la advertised.listeners configurazione dei broker sia impostata sull'indirizzo IP interno degli host.

  3. Creare un cluster di Azure Databricks.

  4. Eseguire il peering del cluster Kafka al cluster Azure Databricks.

    Seguire le istruzioni in Peer virtual networks (Reti virtuali peer).

Autenticazione dell'entità servizio con l'ID Microsoft Entra (in precedenza Azure Active Directory) e Hub eventi di Azure

Azure Databricks supporta l'autenticazione dei processi Spark con i servizi di Hub eventi. Questa autenticazione viene eseguita tramite OAuth con Microsoft Entra ID (in precedenza Azure Active Directory).

Diagramma di autenticazione AAD

Azure Databricks supporta l'autenticazione con ID Microsoft Entra con un ID client e un segreto negli ambienti di calcolo seguenti:

  • Databricks Runtime 12.2 LTS e versioni successive nel calcolo configurato con la modalità di accesso utente singolo.
  • Databricks Runtime 14.3 LTS e versioni successive nel calcolo configurato con la modalità di accesso condiviso.
  • Pipeline di tabelle live delta configurate senza Catalogo Unity.

Azure Databricks non supporta l'autenticazione di Microsoft Entra ID con un certificato in qualsiasi ambiente di calcolo o nelle pipeline delta live tables configurate con Unity Catalog.

Questa autenticazione non funziona nei cluster condivisi o nelle tabelle live delta del catalogo Unity.

Configurazione del Connessione or Structured Streaming Kafka

Per eseguire l'autenticazione con Microsoft Entra ID, sono necessari i valori seguenti:

  • ID tenant. È possibile trovarla nella scheda Servizi MICROSOFT Entra ID .

  • Id client (noto anche come ID applicazione).

  • Segreto client. Una volta ottenuto questo, è necessario aggiungerlo come segreto all'area di lavoro di Databricks. Per aggiungere questo segreto, vedere Gestione dei segreti.

  • Argomento di EventHubs. È possibile trovare un elenco di argomenti nella sezione Hub eventi nella sezione Entità in una pagina specifica dello spazio dei nomi di Hub eventi. Per usare più argomenti, è possibile impostare il ruolo IAM a livello di Hub eventi.

  • Un server EventHubs. È possibile trovarla nella pagina di panoramica dello spazio dei nomi di Hub eventi specifico:

    Spazio dei nomi di Hub eventi

Inoltre, per usare Entra ID, è necessario indicare a Kafka di usare il meccanismo SASL OAuth (SASL è un protocollo generico e OAuth è un tipo di "meccanismo" SASL):

  • kafka.security.protocol dovrebbe essere SASL_SSL
  • kafka.sasl.mechanism dovrebbe essere OAUTHBEARER
  • kafka.sasl.login.callback.handler.class deve essere un nome completo della classe Java con un valore pari kafkashaded al gestore di callback di accesso della classe Kafka ombreggiata. Vedere l'esempio seguente per la classe esatta.

Esempio

Si esaminerà quindi un esempio in esecuzione:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Gestione di potenziali errori

  • Le opzioni di streaming non sono supportate.

    Se si tenta di usare questo meccanismo di autenticazione in una pipeline delta live tables configurata con Unity Catalog, è possibile che venga visualizzato l'errore seguente:

    Errore di streaming non supportato

    Per risolvere questo errore, usare una configurazione di calcolo supportata. Vedere Autenticazione dell'entità servizio con Microsoft Entra ID (in precedenza Azure Active Directory) e Hub eventi di Azure.

  • Impossibile creare un nuovo KafkaAdminClientoggetto .

    Si tratta di un errore interno che Kafka genera se una delle opzioni di autenticazione seguenti non è corretta:

    • ID client (noto anche come ID applicazione)
    • ID tenant
    • Server EventHubs

    Per risolvere l'errore, verificare che i valori siano corretti per queste opzioni.

    Inoltre, è possibile che venga visualizzato questo errore se si modificano le opzioni di configurazione fornite per impostazione predefinita nell'esempio (a cui è stato chiesto di non modificare), ad esempio kafka.security.protocol.

  • Non vengono restituiti record

    Se si sta tentando di visualizzare o elaborare il dataframe ma non si ottengono risultati, nell'interfaccia utente verrà visualizzato quanto segue.

    Nessun messaggio di risultati

    Questo messaggio indica che l'autenticazione ha avuto esito positivo, ma EventHubs non ha restituito dati. Alcuni possibili motivi (anche se non esaustivi) sono:

    • È stato specificato l'argomento EventHubs errato.
    • L'opzione di configurazione Kafka predefinita per startingOffsets è lateste attualmente non si ricevono dati tramite l'argomento. È possibile impostare startingOffsetstoearliest per iniziare a leggere i dati a partire dai primi offset di Kafka.