Dataströmbearbetning med Apache Kafka och Azure Databricks
I den här artikeln beskrivs hur du kan använda Apache Kafka som källa eller mottagare när du kör arbetsbelastningar för strukturerad direktuppspelning på Azure Databricks.
Mer information om Kafka finns i Kafka-dokumentationen.
Läsa data från Kafka
Följande är ett exempel på en strömningsläsning från Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks stöder även batchläsningssemantik för Kafka-datakällor, som du ser i följande exempel:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
För inkrementell batchinläsning rekommenderar Databricks att du använder Kafka med Trigger.AvailableNow
. Se Konfigurera inkrementell batchbearbetning.
I Databricks Runtime 13.3 LTS och senare tillhandahåller Azure Databricks en SQL-funktion för att läsa Kafka-data. Direktuppspelning med SQL stöds endast i Delta Live Tables eller med strömmande tabeller i Databricks SQL. Se read_kafka tabellvärdesfunktion.
Konfigurera Kafka Structured Streaming-läsare
Azure Databricks tillhandahåller nyckelordet kafka
som ett dataformat för att konfigurera anslutningar till Kafka 0.10+.
Följande är de vanligaste konfigurationerna för Kafka:
Det finns flera sätt att ange vilka ämnen som ska prenumerera på. Du bör bara ange en av dessa parametrar:
Alternativ | Värde | beskrivning |
---|---|---|
prenumerera | En kommaavgränsad lista med ämnen. | Ämneslistan att prenumerera på. |
subscribePattern | Java regex-sträng. | Det mönster som används för att prenumerera på ämnen. |
tilldela | JSON-sträng {"topicA":[0,1],"topic":[2,4]} . |
Specifika topicPartitioner att använda. |
Andra viktiga konfigurationer:
Alternativ | Värde | Standardvärde | beskrivning |
---|---|---|---|
kafka.bootstrap.servers | Kommaavgränsad lista över värd:port. | empty | [Krävs] Kafka-konfigurationen bootstrap.servers . Om du upptäcker att det inte finns några data från Kafka kontrollerar du listan med koordinatoradresser först. Om koordinatoradresslistan är felaktig kanske det inte finns några fel. Detta beror på att Kafka-klienten förutsätter att koordinatorerna blir tillgängliga så småningom och om nätverksfel försöker igen för alltid. |
failOnDataLoss | true eller false . |
true |
[Valfritt] Om frågan ska misslyckas när det är möjligt att data har gått förlorade. Frågor kan permanent misslyckas med att läsa data från Kafka på grund av många scenarier, till exempel borttagna ämnen, ämnestrunkering före bearbetning och så vidare. Vi försöker uppskatta om data eventuellt har gått förlorade eller inte. Ibland kan detta orsaka falska larm. Ange det här alternativet till false om det inte fungerar som förväntat, eller om du vill att frågan ska fortsätta bearbetas trots dataförlust. |
minPartitions | Heltal >= 0, 0 = inaktiverat. | 0 (inaktiverad) | [Valfritt] Minsta antal partitioner som ska läsas från Kafka. Du kan konfigurera Spark att använda ett godtyckligt minimum av partitioner för att läsa från Kafka med hjälp minPartitions av alternativet . Normalt har Spark en 1-1-mappning av Kafka topicPartitions till Spark-partitioner som förbrukar från Kafka. Om du ställer in minPartitions alternativet på ett värde som är större än dina Kafka topicPartitions kommer Spark att dela upp stora Kafka-partitioner till mindre delar. Det här alternativet kan ställas in vid tider med hög belastning, datasnedvridning och när dataströmmen halkar efter för att öka bearbetningshastigheten. Det kostar att initiera Kafka-konsumenter vid varje utlösare, vilket kan påverka prestanda om du använder SSL när du ansluter till Kafka. |
kafka.group.id | Ett Kafka-konsumentgrupps-ID. | inte inställt | [Valfritt] Grupp-ID som ska användas vid läsning från Kafka. Använd detta med försiktighet. Som standard genererar varje fråga ett unikt grupp-ID för att läsa data. Detta säkerställer att varje fråga har en egen konsumentgrupp som inte utsätts för interferens från någon annan konsument och därför kan läsa alla partitioner i sina prenumerationsavsnitt. I vissa scenarier (till exempel Kafka-gruppbaserad auktorisering) kanske du vill använda specifika auktoriserade grupp-ID:er för att läsa data. Du kan också ange grupp-ID:t. Men gör detta med extrem försiktighet eftersom det kan orsaka oväntat beteende. – Frågor som körs samtidigt (både batch och direktuppspelning) med samma grupp-ID stör sannolikt varandra, vilket gör att varje fråga endast läser en del av data. – Detta kan också inträffa när frågor startas/startas om i snabb följd. Om du vill minimera sådana problem anger du att Kafka-konsumentkonfigurationen session.timeout.ms är mycket liten. |
startingOffsets | tidigaste , senaste | senaste | [Valfritt] Startpunkten när en fråga startas, antingen "tidigast" som är från de tidigaste förskjutningarna, eller en json-sträng som anger en startförskjutning för varje TopicPartition. I json kan -2 som förskjutning användas för att referera till tidigaste, -1 till senaste. Obs! För batchfrågor tillåts inte den senaste (implicit eller med hjälp av -1 i json). För direktuppspelningsfrågor gäller detta bara när en ny fråga startas, och det återupptas alltid där frågan slutade. Nyligen identifierade partitioner under en fråga startar tidigast. |
Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.
Schema för Kafka-poster
Schemat för Kafka-poster är:
Column | Typ |
---|---|
key | binary |
värde | binary |
Avsnitt | sträng |
skifte | heltal |
förskjutning | lång |
timestamp | lång |
timestampType | heltal |
key
Och value
är alltid deserialiserade som bytematriser med ByteArrayDeserializer
. Använd DataFrame-åtgärder (till exempel cast("string")
) för att explicit deserialisera nycklar och värden.
Skriva data till Kafka
Följande är ett exempel på en direktuppspelningsskrivning till Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks stöder även batchskrivningssemantik till Kafka-datamottagare, som du ser i följande exempel:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Konfigurera Kafka Structured Streaming Writer
Viktigt!
Databricks Runtime 13.3 LTS och senare innehåller en nyare version av kafka-clients
biblioteket som aktiverar idempotent-skrivningar som standard. Om en Kafka-mottagare använder version 2.8.0 eller senare med ACL:er konfigurerade, men utan IDEMPOTENT_WRITE
aktiverad, misslyckas skrivning med felmeddelandet org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Lös det här felet genom att uppgradera till Kafka version 2.8.0 eller senare, eller genom att ange .option(“kafka.enable.idempotence”, “false”)
när du konfigurerar din structured streaming-skrivare.
Schemat som tillhandahålls till DataStreamWriter interagerar med Kafka-mottagaren. Du kan använda följande fält:
Kolumnnamn | Obligatorisk eller valfri | Typ |
---|---|---|
key |
valfri | STRING eller BINARY |
value |
required | STRING eller BINARY |
headers |
valfri | ARRAY |
topic |
valfritt (ignoreras om topic anges som skrivalternativ) |
STRING |
partition |
valfri | INT |
Följande är vanliga alternativ som anges när du skriver till Kafka:
Alternativ | Värde | Standardvärde | beskrivning |
---|---|---|---|
kafka.boostrap.servers |
En kommaavgränsad lista över <host:port> |
inget | [Krävs] Kafka-konfigurationen bootstrap.servers . |
topic |
STRING |
inte inställt | [Valfritt] Anger ämnet för alla rader som ska skrivas. Det här alternativet åsidosätter alla ämneskolumner som finns i data. |
includeHeaders |
BOOLEAN |
false |
[Valfritt] Om Kafka-rubrikerna ska inkluderas på raden. |
Se Integrationsguide för strukturerad direktuppspelning kafka för andra valfria konfigurationer.
Hämta Kafka-mått
Du kan få medelvärdet, min och max för antalet förskjutningar som strömningsfrågan ligger bakom den senaste tillgängliga förskjutningen bland alla prenumererade ämnen med måtten avgOffsetsBehindLatest
, maxOffsetsBehindLatest
och minOffsetsBehindLatest
. Se Läsa mått interaktivt.
Kommentar
Tillgänglig i Databricks Runtime 9.1 och senare.
Hämta det uppskattade totala antalet byte som frågeprocessen inte har förbrukat från de prenumererade ämnena genom att undersöka värdet för estimatedTotalBytesBehindLatest
. Den här uppskattningen baseras på de batchar som bearbetats under de senaste 300 sekunderna. Den tidsram som uppskattningen baseras på kan ändras genom att alternativet bytesEstimateWindowLength
anges till ett annat värde. Om du till exempel vill ange den till 10 minuter:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Om du kör dataströmmen i en notebook-fil kan du se dessa mått under fliken Rådata på instrumentpanelen för strömningsfrågans förlopp:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
Använda SSL för att ansluta Azure Databricks till Kafka
Om du vill aktivera SSL-anslutningar till Kafka följer du anvisningarna i Confluent-dokumentationen Kryptering och autentisering med SSL. Du kan ange de konfigurationer som beskrivs där, prefixet med kafka.
, som alternativ. Du kan till exempel ange platsen för förtroendearkivet i egenskapen kafka.ssl.truststore.location
.
Databricks rekommenderar att du:
- Lagra dina certifikat i molnobjektlagring. Du kan endast begränsa åtkomsten till certifikaten till kluster som har åtkomst till Kafka. Se Datastyrning med Unity Catalog.
- Lagra dina certifikatlösenord som hemligheter i ett hemligt omfång.
I följande exempel används objektlagringsplatser och Databricks-hemligheter för att aktivera en SSL-anslutning:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Ansluta Kafka på HDInsight till Azure Databricks
Skapa ett HDInsight Kafka-kluster.
Mer information finns i Ansluta till Kafka i HDInsight via ett virtuellt Azure-nätverk .
Konfigurera Kafka-koordinatorerna så att de annonserar rätt adress.
Följ anvisningarna i Konfigurera Kafka för IP-reklam. Om du hanterar Kafka själv på Azure Virtual Machines kontrollerar du att konfigurationen
advertised.listeners
av koordinatorerna är inställd på värdarnas interna IP-adress.Skapa ett Azure Databricks-kluster.
Peer-koppla Kafka-klustret till Azure Databricks-klustret.
Följ anvisningarna i peer-virtuella nätverk.
Autentisering med tjänstens huvudnamn med Microsoft Entra-ID och Azure Event Hubs
Azure Databricks stöder autentisering av Spark-jobb med Event Hubs-tjänster. Den här autentiseringen görs via OAuth med Microsoft Entra-ID.
Azure Databricks stöder Microsoft Entra-ID-autentisering med ett klient-ID och en hemlighet i följande beräkningsmiljöer:
- Databricks Runtime 12.2 LTS och senare vid beräkning som konfigurerats med åtkomstläge för en användare.
- Databricks Runtime 14.3 LTS och senare vid beräkning som konfigurerats med läget för delad åtkomst.
- Delta Live Tables-pipelines konfigurerade utan Unity Catalog.
Azure Databricks stöder inte Microsoft Entra-ID-autentisering med ett certifikat i någon beräkningsmiljö eller i Delta Live Tables-pipelines som konfigurerats med Unity Catalog.
Den här autentiseringen fungerar inte på delade kluster eller på Unity Catalog Delta Live Tables.
Konfigurera Kafka-anslutningsappen för strukturerad direktuppspelning
För att kunna utföra autentisering med Microsoft Entra-ID behöver du följande värden:
Ett klientorganisations-ID. Du hittar detta på fliken Microsoft Entra ID-tjänster .
Ett clientID (även kallat program-ID).
En klienthemlighet. När du har det här bör du lägga till det som en hemlighet i din Databricks-arbetsyta. Information om hur du lägger till den här hemligheten finns i Hemlighetshantering.
Ett EventHubs-ämne. Du hittar en lista med ämnen i avsnittet Event Hubs under avsnittet Entiteter på en specifik sida för Event Hubs-namnrymd. Om du vill arbeta med flera ämnen kan du ange IAM-rollen på Event Hubs-nivå.
En EventHubs-server. Du hittar detta på översiktssidan för ditt specifika Event Hubs-namnområde:
För att kunna använda Entra-ID måste vi dessutom be Kafka att använda OAuth SASL-mekanismen (SASL är ett generiskt protokoll och OAuth är en typ av SASL-mekanism):
kafka.security.protocol
bör varaSASL_SSL
kafka.sasl.mechanism
bör varaOAUTHBEARER
kafka.sasl.login.callback.handler.class
bör vara ett fullständigt kvalificerat namn på Java-klassen med värdetkafkashaded
för till hanteraren för återanrop för inloggning för vår skuggade Kafka-klass. Se följande exempel för den exakta klassen.
Exempel
Nu ska vi titta på ett exempel som körs:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Hantera potentiella fel
Strömningsalternativ stöds inte.
Om du försöker använda den här autentiseringsmekanismen i en Delta Live Tables-pipeline som konfigurerats med Unity Catalog kan du få följande fel:
Lös det här felet genom att använda en beräkningskonfiguration som stöds. Se Autentisering med tjänstens huvudnamn med Microsoft Entra-ID och Azure Event Hubs.
Det gick inte att skapa en ny
KafkaAdminClient
.Det här är ett internt fel som Kafka genererar om något av följande autentiseringsalternativ är felaktigt:
- Klient-ID (även kallat program-ID)
- Klientorganisations-ID
- EventHubs-server
För att lösa felet kontrollerar du att värdena är korrekta för de här alternativen.
Dessutom kan det här felet visas om du ändrar konfigurationsalternativen som anges som standard i exemplet (som du uppmanas att inte ändra), till exempel
kafka.security.protocol
.Det finns inga poster som returneras
Om du försöker visa eller bearbeta din DataFrame men inte får resultat visas följande i användargränssnittet.
Det här meddelandet innebär att autentiseringen lyckades, men EventHubs returnerade inga data. Några möjliga (men inte på något sätt uttömmande) skäl är:
- Du har angett fel EventHubs-ämne .
- Standardalternativet kafka-konfiguration för
startingOffsets
ärlatest
, och du tar för närvarande inte emot några data via ämnet ännu. Du kan angestartingOffsetstoearliest
att börja läsa data från Kafkas tidigaste förskjutningar.