Dataströmbearbetning med Apache Kafka och Azure Databricks

I den här artikeln beskrivs hur du kan använda Apache Kafka som källa eller mottagare när du kör arbetsbelastningar för strukturerad direktuppspelning på Azure Databricks.

Mer information om Kafka finns i Kafka-dokumentationen.

Läsa data från Kafka

Följande är ett exempel på en strömningsläsning från Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks stöder även batchläsningssemantik för Kafka-datakällor, som du ser i följande exempel:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

För inkrementell batchinläsning rekommenderar Databricks att du använder Kafka med Trigger.AvailableNow. Se Konfigurera inkrementell batchbearbetning.

I Databricks Runtime 13.3 LTS och senare tillhandahåller Azure Databricks en SQL-funktion för att läsa Kafka-data. Direktuppspelning med SQL stöds endast i Delta Live Tables eller med strömmande tabeller i Databricks SQL. Se read_kafka tabellvärdesfunktion.

Konfigurera Kafka Structured Streaming-läsare

Azure Databricks tillhandahåller nyckelordet kafka som ett dataformat för att konfigurera anslutningar till Kafka 0.10+.

Följande är de vanligaste konfigurationerna för Kafka:

Det finns flera sätt att ange vilka ämnen som ska prenumerera på. Du bör bara ange en av dessa parametrar:

Alternativ Värde beskrivning
prenumerera En kommaavgränsad lista med ämnen. Ämneslistan att prenumerera på.
subscribePattern Java regex-sträng. Det mönster som används för att prenumerera på ämnen.
tilldela JSON-sträng {"topicA":[0,1],"topic":[2,4]}. Specifika topicPartitioner att använda.

Andra viktiga konfigurationer:

Alternativ Värde Standardvärde beskrivning
kafka.bootstrap.servers Kommaavgränsad lista över värd:port. empty [Krävs] Kafka-konfigurationen bootstrap.servers . Om du upptäcker att det inte finns några data från Kafka kontrollerar du listan med koordinatoradresser först. Om koordinatoradresslistan är felaktig kanske det inte finns några fel. Detta beror på att Kafka-klienten förutsätter att koordinatorerna blir tillgängliga så småningom och om nätverksfel försöker igen för alltid.
failOnDataLoss true eller false. true [Valfritt] Om frågan ska misslyckas när det är möjligt att data har gått förlorade. Frågor kan permanent misslyckas med att läsa data från Kafka på grund av många scenarier, till exempel borttagna ämnen, ämnestrunkering före bearbetning och så vidare. Vi försöker uppskatta om data eventuellt har gått förlorade eller inte. Ibland kan detta orsaka falska larm. Ange det här alternativet till false om det inte fungerar som förväntat, eller om du vill att frågan ska fortsätta bearbetas trots dataförlust.
minPartitions Heltal >= 0, 0 = inaktiverat. 0 (inaktiverad) [Valfritt] Minsta antal partitioner som ska läsas från Kafka. Du kan konfigurera Spark att använda ett godtyckligt minimum av partitioner för att läsa från Kafka med hjälp minPartitions av alternativet . Normalt har Spark en 1-1-mappning av Kafka topicPartitions till Spark-partitioner som förbrukar från Kafka. Om du ställer in minPartitions alternativet på ett värde som är större än dina Kafka topicPartitions kommer Spark att dela upp stora Kafka-partitioner till mindre delar. Det här alternativet kan ställas in vid tider med hög belastning, datasnedvridning och när dataströmmen halkar efter för att öka bearbetningshastigheten. Det kostar att initiera Kafka-konsumenter vid varje utlösare, vilket kan påverka prestanda om du använder SSL när du ansluter till Kafka.
kafka.group.id Ett Kafka-konsumentgrupps-ID. inte inställt [Valfritt] Grupp-ID som ska användas vid läsning från Kafka. Använd detta med försiktighet. Som standard genererar varje fråga ett unikt grupp-ID för att läsa data. Detta säkerställer att varje fråga har en egen konsumentgrupp som inte utsätts för interferens från någon annan konsument och därför kan läsa alla partitioner i sina prenumerationsavsnitt. I vissa scenarier (till exempel Kafka-gruppbaserad auktorisering) kanske du vill använda specifika auktoriserade grupp-ID:er för att läsa data. Du kan också ange grupp-ID:t. Men gör detta med extrem försiktighet eftersom det kan orsaka oväntat beteende.

* Frågor som körs samtidigt (både batch och direktuppspelning) med samma grupp-ID stör sannolikt varandra, vilket gör att varje fråga endast läser en del av data.
* Detta kan också inträffa när frågor startas/startas om i snabb följd. Om du vill minimera sådana problem anger du att Kafka-konsumentkonfigurationen session.timeout.ms är mycket liten.
startingOffsets tidigaste , senaste senaste [Valfritt] Startpunkten när en fråga startas, antingen "tidigast" som är från de tidigaste förskjutningarna, eller en json-sträng som anger en startförskjutning för varje TopicPartition. I json kan -2 som förskjutning användas för att referera till tidigaste, -1 till senaste. Obs! För batchfrågor tillåts inte den senaste (implicit eller med hjälp av -1 i json). För direktuppspelningsfrågor gäller detta bara när en ny fråga startas, och det återupptas alltid där frågan slutade. Nyligen identifierade partitioner under en fråga startar tidigast.

Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.

Schema för Kafka-poster

Schemat för Kafka-poster är:

Column Typ
key binary
värde binary
Avsnitt sträng
Partition heltal
förskjutning lång
timestamp lång
timestampType heltal

key Och value är alltid deserialiserade som bytematriser med ByteArrayDeserializer. Använd DataFrame-åtgärder (till exempel cast("string")) för att explicit deserialisera nycklar och värden.

Skriva data till Kafka

Följande är ett exempel på en direktuppspelningsskrivning till Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks stöder även batchskrivningssemantik till Kafka-datamottagare, som du ser i följande exempel:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Konfigurera Kafka Structured Streaming Writer

Viktigt!

Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients biblioteket som aktiverar idempotent-skrivningar som standard. Om en Kafka-mottagare använder version 2.8.0 eller senare med ACL:er konfigurerade, men utan IDEMPOTENT_WRITE aktiverad, misslyckas skrivning med felmeddelandet org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state.

Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option(“kafka.enable.idempotence”, “false”) när du konfigurerar din structured streaming-skrivare.

Schemat som tillhandahålls till DataStreamWriter interagerar med Kafka-mottagaren. Du kan använda följande fält:

Kolumnnamn Obligatorisk eller valfri Typ
key valfri STRING eller BINARY
value required STRING eller BINARY
headers valfri ARRAY
topic valfritt (ignoreras om topic anges som skrivalternativ) STRING
partition valfri INT

Följande är vanliga alternativ som anges när du skriver till Kafka:

Alternativ Värde Standardvärde beskrivning
kafka.boostrap.servers En kommaavgränsad lista över <host:port> inget [Krävs] Kafka-konfigurationen bootstrap.servers .
topic STRING inte inställt [Valfritt] Anger ämnet för alla rader som ska skrivas. Det här alternativet åsidosätter alla ämneskolumner som finns i data.
includeHeaders BOOLEAN false [Valfritt] Om Kafka-rubrikerna ska inkluderas på raden.

Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.

Hämta Kafka-mått

Du kan få medelvärdet, min och max för antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumererade ämnen med måtten avgOffsetsBehindLatest, maxOffsetsBehindLatestoch minOffsetsBehindLatest . Se Läsa mått interaktivt.

Kommentar

Tillgänglig i Databricks Runtime 9.1 och senare.

Hämta det uppskattade totala antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena genom att undersöka värdet för estimatedTotalBytesBehindLatest. Den här uppskattningen baseras på de batchar som bearbetats under de senaste 300 sekunderna. Den tidsram som uppskattningen baseras på kan ändras genom att alternativet bytesEstimateWindowLength anges till ett annat värde. Om du till exempel vill ange den till 10 minuter:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Om du kör dataströmmen i en notebook-fil kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Använda SSL för att ansluta Azure Databricks till Kafka

Om du vill aktivera SSL-anslutningar till Kafka följer du anvisningarna i Confluent-dokumentationen Kryptering och autentisering med SSL. Du kan ange de konfigurationer som beskrivs där, prefixet med kafka., som alternativ. Du kan till exempel ange platsen för förtroendearkivet i egenskapen kafka.ssl.truststore.location.

Databricks rekommenderar att du:

I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Anslut Kafka på HDInsight till Azure Databricks

  1. Skapa ett HDInsight Kafka-kluster.

    Anvisningar finns i Anslut till Kafka i HDInsight via ett virtuellt Azure-nätverk.

  2. Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.

    Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines kontrollerar du att konfigurationen advertised.listeners av koordinatorerna är inställd på värdarnas interna IP-adress.

  3. Skapa ett Azure Databricks-kluster.

  4. Peer-koppla Kafka-klustret till Azure Databricks-klustret.

    Följ anvisningarna i peer-virtuella nätverk.

Autentisering med tjänstens huvudnamn med Microsoft Entra-ID (tidigare Azure Active Directory) och Azure Event Hubs

Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra-ID (tidigare Azure Active Directory).

AAD-autentiseringsdiagram

Azure Databricks stöder Microsoft Entra-ID-autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:

  • Databricks Runtime 12.2 LTS och senare vid beräkning som konfigurerats med åtkomstläge för en användare.
  • Databricks Runtime 14.3 LTS och senare vid beräkning som konfigurerats med läget för delad åtkomst.
  • Delta Live Tables-pipelines konfigurerade utan Unity Catalog.

Azure Databricks stöder inte Microsoft Entra-ID-autentisering med ett certifikat i någon beräkningsmiljö eller i Delta Live Tables-pipelines som konfigurerats med Unity Catalog.

Den här autentiseringen fungerar inte på delade kluster eller på Unity Catalog Delta Live Tables.

Konfigurera Kafka-Anslut eller för strukturerad direktuppspelning

För att kunna utföra autentisering med Microsoft Entra-ID behöver du följande värden:

  • Ett klientorganisations-ID. Du hittar detta på fliken Microsoft Entra ID-tjänster .

  • Ett clientID (även kallat program-ID).

  • En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.

  • Ett EventHubs-ämne. Du hittar en lista med ämnen i avsnittet Event Hubs under avsnittet Entiteter på en specifik sida för Event Hubs-namnrymd. Om du vill arbeta med flera ämnen kan du ange IAM-rollen på Event Hubs-nivå.

  • En EventHubs-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:

    Event Hubs namnrymd

För att kunna använda Entra-ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett generiskt protokoll och OAuth är en typ av SASL-mekanism):

  • kafka.security.protocol bör vara SASL_SSL
  • kafka.sasl.mechanism bör vara OAUTHBEARER
  • kafka.sasl.login.callback.handler.class bör vara ett fullständigt kvalificerat namn på Java-klassen med värdet kafkashaded för till hanteraren för återanrop för inloggning för vår skuggade Kafka-klass. Se följande exempel för den exakta klassen.

Exempel

Nu ska vi titta på ett exempel som körs:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Hantera potentiella fel

  • Strömningsalternativ stöds inte.

    Om du försöker använda den här autentiseringsmekanismen i en Delta Live Tables-pipeline som konfigurerats med Unity Catalog kan du få följande fel:

    Strömningsfel som inte stöds

    Lös det här felet genom att använda en beräkningskonfiguration som stöds. Se Autentisering med tjänstens huvudnamn med Microsoft Entra-ID (tidigare Azure Active Directory) och Azure Event Hubs.

  • Det gick inte att skapa en ny KafkaAdminClient.

    Det här är ett internt fel som Kafka genererar om något av följande autentiseringsalternativ är felaktigt:

    • Klient-ID (även kallat program-ID)
    • Klientorganisations-ID
    • EventHubs-server

    För att lösa felet kontrollerar du att värdena är korrekta för de här alternativen.

    Dessutom kan det här felet visas om du ändrar konfigurationsalternativen som anges som standard i exemplet (som du uppmanas att inte ändra), till exempel kafka.security.protocol.

  • Det finns inga poster som returneras

    Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.

    Inget resultatmeddelande

    Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:

    • Du har angett fel EventHubs-ämne .
    • Standardalternativet kafka-konfiguration för startingOffsets är latest, och du tar för närvarande inte emot några data via ämnet ännu. Du kan ange startingOffsetstoearliest att börja läsa data från Kafkas tidigaste förskjutningar.