Streamverwerking met Apache Kafka en Azure Databricks
In dit artikel wordt beschreven hoe u Apache Kafka kunt gebruiken als bron of sink bij het uitvoeren van structured streaming-workloads in Azure Databricks.
Zie de Kafka-documentatie voor meer Kafka.
Gegevens lezen uit Kafka
Hier volgt een voorbeeld voor een streaming-leesbewerking uit Kafka:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks biedt ook ondersteuning voor semantiek voor batchleesbewerkingen voor Kafka-gegevensbronnen, zoals wordt weergegeven in het volgende voorbeeld:
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Voor het laden van incrementele batches raadt Databricks het gebruik van Kafka aan met Trigger.AvailableNow
. Zie Incrementele batchverwerking configureren.
In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks een SQL-functie voor het lezen van Kafka-gegevens. Streamen met SQL wordt alleen ondersteund in Delta Live Tables of met streamingtabellen in Databricks SQL. Zie read_kafka functie met tabelwaarde.
Kafka Structured Streaming-lezer configureren
Azure Databricks biedt het trefwoord als een gegevensindeling voor het kafka
configureren van verbindingen met Kafka 0.10+.
Hier volgen de meest voorkomende configuraties voor Kafka:
Er zijn meerdere manieren waarop u kunt opgeven op welke onderwerpen u zich wilt abonneren. U moet slechts een van deze parameters opgeven:
Optie | Weergegeven als | Beschrijving |
---|---|---|
abonneren | Een door komma's gescheiden lijst met onderwerpen. | De lijst met onderwerpen waarop u zich wilt abonneren. |
subscribePattern | Java regex-tekenreeks. | Het patroon dat wordt gebruikt om u te abonneren op een of meer onderwerpen. |
toewijzen | JSON-tekenreeks {"topicA":[0,1],"topic":[2,4]} . |
Specifieke topicPartitions die moeten worden gebruikt. |
Andere belangrijke configuraties:
Optie | Weergegeven als | Standaardwaarde | Beschrijving |
---|---|---|---|
kafka.bootstrap.servers | Door komma's gescheiden lijst met host:poort. | empty | [Vereist] De Kafka-configuratie bootstrap.servers . Als u merkt dat er geen gegevens uit Kafka zijn, controleert u eerst de adreslijst van de broker. Als de adreslijst van de broker onjuist is, zijn er mogelijk geen fouten. Dit komt doordat de Kafka-client ervan uitgaat dat de brokers uiteindelijk beschikbaar komen en in het geval van netwerkfouten voor altijd opnieuw proberen. |
failOnDataLoss | true of false . |
true |
[Optioneel] Of de query mislukt wanneer het mogelijk is dat de gegevens verloren zijn gegaan. Query's kunnen gegevens uit Kafka permanent niet lezen vanwege veel scenario's, zoals verwijderde onderwerpen, afkapping van onderwerpen voordat ze worden verwerkt, enzovoort. We proberen conservatief te schatten of gegevens mogelijk verloren zijn gegaan of niet. Soms kan dit valse waarschuwingen veroorzaken. Stel deze optie in op false als deze niet werkt zoals verwacht of als u wilt dat de query blijft verwerken ondanks gegevensverlies. |
minPartitions | Geheel getal >= 0, 0 = uitgeschakeld. | 0 (uitgeschakeld) | [Optioneel] Minimaal aantal partities dat moet worden gelezen uit Kafka. U kunt Spark configureren voor het gebruik van een willekeurig minimum aan partities om vanuit Kafka te lezen met behulp van de minPartitions optie. Normaal gesproken heeft Spark een 1-1 toewijzing van Kafka topicPartitions aan Spark-partities die van Kafka worden gebruikt. Als u de minPartitions optie instelt op een waarde die groter is dan uw Kafka topicPartitions, zal Spark grote Kafka-partities op kleinere delen verdelen. Deze optie kan worden ingesteld op momenten van piekbelastingen, scheeftrekken van gegevens en naarmate uw stroom achterloopt om de verwerkingssnelheid te verhogen. Het komt ten koste van het initialiseren van Kafka-consumenten bij elke trigger, wat de prestaties kan beïnvloeden als u SSL gebruikt bij het maken van verbinding met Kafka. |
kafka.group.id | Een Kafka-consumentengroep-id. | niet ingesteld | [Optioneel] Groeps-id die moet worden gebruikt tijdens het lezen vanuit Kafka. Gebruik dit met voorzichtigheid. Standaard genereert elke query een unieke groeps-id voor het lezen van gegevens. Dit zorgt ervoor dat elke query een eigen consumentengroep heeft die geen interferentie van een andere consument ondervindt en daarom alle partities van de geabonneerde onderwerpen kan lezen. In sommige scenario's (bijvoorbeeld autorisatie op basis van kafka-groepen), kunt u specifieke geautoriseerde groeps-id's gebruiken om gegevens te lezen. U kunt desgewenst de groeps-id instellen. Doe dit echter met extreme voorzichtigheid, omdat dit onverwacht gedrag kan veroorzaken. - Gelijktijdig uitgevoerde query's (zowel batch- als streaming) met dezelfde groeps-id beïnvloeden waarschijnlijk elkaar, waardoor elke query slechts een deel van de gegevens kan lezen. - Dit kan ook gebeuren wanneer query's snel na elkaar worden gestart/opnieuw gestart. Als u dergelijke problemen wilt minimaliseren, stelt u de Kafka-consumentenconfiguratie session.timeout.ms in op zeer klein. |
startingOffsets | vroegste, meest recente | nieuwste | [Optioneel] Het beginpunt waarop een query wordt gestart, ofwel 'vroegst' die afkomstig is van de vroegste verschuivingen, of een json-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de json kan -2 als een offset worden gebruikt om te verwijzen naar vroegste, -1 naar meest recente. Opmerking: Voor batchquery's is de meest recente (impliciet of met behulp van -1 in json) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart en dat hervatten altijd wordt opgehaald van waar de query was gebleven. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste. |
Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.
Schema voor Kafka-records
Het schema van Kafka-records is:
Column | Type |
---|---|
sleutel | binair |
waarde | binair |
onderwerp | tekenreeks |
partitie | int |
offset | long |
timestamp | long |
timestampType | int |
De key
en de value
worden altijd gedeserialiseerd als bytematrices met de ByteArrayDeserializer
. Gebruik DataFrame-bewerkingen (zoals cast("string")
) om de sleutels en waarden expliciet deserialiseren.
Gegevens schrijven naar Kafka
Hier volgt een voorbeeld van een streaming-schrijfbewerking naar Kafka:
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks ondersteunt ook semantiek voor batch-schrijfbewerkingen naar Kafka-gegevenssinks, zoals wordt weergegeven in het volgende voorbeeld:
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Kafka Structured Streaming Writer configureren
Belangrijk
Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients
bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als een Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd, maar zonder IDEMPOTENT_WRITE
ingeschakeld, mislukt de schrijfbewerking met het foutbericht org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
.
Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option(“kafka.enable.idempotence”, “false”)
tijdens het configureren van uw Structured Streaming Writer.
Het schema dat aan de DataStreamWriter is verstrekt, communiceert met de Kafka-sink. U kunt de volgende velden gebruiken:
Kolomnaam | Vereist of optioneel | Type |
---|---|---|
key |
optioneel | STRING of BINARY |
value |
vereist | STRING of BINARY |
headers |
optioneel | ARRAY |
topic |
optioneel (genegeerd als topic is ingesteld als schrijveroptie) |
STRING |
partition |
optioneel | INT |
Hier volgen algemene opties die zijn ingesteld tijdens het schrijven naar Kafka:
Optie | Weergegeven als | Default value | Beschrijving |
---|---|---|---|
kafka.boostrap.servers |
Een door komma's gescheiden lijst van <host:port> |
Geen | [Vereist] De Kafka-configuratie bootstrap.servers . |
topic |
STRING |
niet ingesteld | [Optioneel] Hiermee stelt u het onderwerp in voor alle rijen die moeten worden geschreven. Met deze optie worden alle onderwerpkolommen overschreven die in de gegevens aanwezig zijn. |
includeHeaders |
BOOLEAN |
false |
[Optioneel] Of u de Kafka-headers in de rij wilt opnemen. |
Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.
Metrische Kafka-gegevens ophalen
U kunt het gemiddelde, het minimum en het maximum van het aantal offsets ophalen dat de streamingquery achter de meest recente beschikbare offset ligt tussen alle geabonneerde onderwerpen met de avgOffsetsBehindLatest
maxOffsetsBehindLatest
, en minOffsetsBehindLatest
metrische gegevens. Zie Interactief metrische gegevens lezen.
Notitie
Beschikbaar in Databricks Runtime 9.1 en hoger.
Haal het geschatte totale aantal bytes op dat het queryproces niet heeft verbruikt uit de geabonneerde onderwerpen door de waarde van estimatedTotalBytesBehindLatest
. Deze schatting is gebaseerd op de batches die in de afgelopen 300 seconden zijn verwerkt. Het tijdsbestek waarop de schatting is gebaseerd, kan worden gewijzigd door de optie bytesEstimateWindowLength
in te stellen op een andere waarde. Als u deze bijvoorbeeld wilt instellen op 10 minuten:
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
SSL gebruiken om Azure Databricks te verbinden met Kafka
Als u SSL-verbindingen met Kafka wilt inschakelen, volgt u de instructies in de Confluent-documentatie Versleuteling en verificatie met SSL. U kunt de configuraties opgeven die daar worden beschreven, voorafgegaan door kafka.
, als opties. U geeft bijvoorbeeld de locatie van het vertrouwensarchief op in de eigenschap kafka.ssl.truststore.location
.
Databricks raadt u het volgende aan:
- Sla uw certificaten op in de opslag van cloudobjecten. U kunt de toegang tot de certificaten alleen beperken tot clusters die toegang hebben tot Kafka. Zie Gegevensbeheer met Unity Catalog.
- Sla uw certificaatwachtwoorden op als geheimen in een geheim bereik.
In het volgende voorbeeld worden objectopslaglocaties en Databricks-geheimen gebruikt om een SSL-verbinding in te schakelen:
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Kafka in HDInsight verbinden met Azure Databricks
Maak een HDInsight Kafka-cluster.
Zie Verbinding maken met Kafka in HDInsight via een virtueel Azure-netwerk voor instructies.
Configureer de Kafka-brokers om het juiste adres te adverteren.
Volg de instructies in Kafka configureren voor IP-reclame. Als u Kafka zelf beheert op virtuele Azure-machines, moet u ervoor zorgen dat de
advertised.listeners
configuratie van de brokers is ingesteld op het interne IP-adres van de hosts.Maak een Azure Databricks-cluster.
Koppel het Kafka-cluster aan het Azure Databricks-cluster.
Volg de instructies in virtuele peernetwerken.
Verificatie van service-principal met Microsoft Entra-id en Azure Event Hubs
Azure Databricks ondersteunt de verificatie van Spark-taken met Event Hubs-services. Deze verificatie wordt uitgevoerd via OAuth met Microsoft Entra-id.
Azure Databricks biedt ondersteuning voor Microsoft Entra ID-verificatie met een client-id en geheim in de volgende rekenomgevingen:
- Databricks Runtime 12.2 LTS en hoger op rekenkracht die is geconfigureerd met de modus voor toegang van één gebruiker.
- Databricks Runtime 14.3 LTS en hoger op rekenkracht die is geconfigureerd met de modus voor gedeelde toegang.
- Delta Live Tables-pijplijnen die zijn geconfigureerd zonder Unity Catalog.
Azure Databricks biedt geen ondersteuning voor Microsoft Entra ID-verificatie met een certificaat in een rekenomgeving of in Delta Live Tables-pijplijnen die zijn geconfigureerd met Unity Catalog.
Deze verificatie werkt niet op gedeelde clusters of in Unity Catalog Delta Live Tables.
De Structured Streaming Kafka-connector configureren
Voor het uitvoeren van verificatie met Microsoft Entra-id hebt u de volgende waarden nodig:
Een tenant-id. U vindt dit op het tabblad Services van Microsoft Entra ID .
Een clientID (ook wel toepassings-id genoemd).
Een clientgeheim. Zodra u dit hebt, moet u het toevoegen als een geheim aan uw Databricks-werkruimte. Zie Geheimbeheer om dit geheim toe te voegen.
Een EventHubs-onderwerp. U vindt een lijst met onderwerpen in de sectie Event Hubs onder de sectie Entiteiten op een specifieke Event Hubs-naamruimtepagina . Als u met meerdere onderwerpen wilt werken, kunt u de IAM-rol instellen op Event Hubs-niveau.
Een EventHubs-server. U vindt dit op de overzichtspagina van uw specifieke Event Hubs-naamruimte:
Daarnaast moeten we Kafka vertellen het OAuth SASL-mechanisme te gebruiken (SASL is een algemeen protocol en OAuth is een type SASL-mechanisme):
kafka.security.protocol
moet zijnSASL_SSL
kafka.sasl.mechanism
moet zijnOAUTHBEARER
kafka.sasl.login.callback.handler.class
moet een volledig gekwalificeerde naam van de Java-klasse zijn met de waarde vankafkashaded
de callback-handler voor aanmelding van onze gearceerde Kafka-klasse. Zie het volgende voorbeeld voor de exacte klasse.
Opmerking
Laten we nu eens kijken naar een actief voorbeeld:
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
Mogelijke fouten afhandelen
Streamingopties worden niet ondersteund.
Als u dit verificatiemechanisme probeert te gebruiken in een Delta Live Tables-pijplijn die is geconfigureerd met Unity Catalog, krijgt u mogelijk de volgende fout:
Gebruik een ondersteunde rekenconfiguratie om deze fout op te lossen. Zie Service Principal-verificatie met Microsoft Entra ID en Azure Event Hubs.
Kan geen nieuwe
KafkaAdminClient
maken.Dit is een interne fout die Kafka genereert als een van de volgende verificatieopties onjuist is:
- Client-id (ook wel toepassings-id genoemd)
- Tenant-id
- EventHubs-server
Als u de fout wilt oplossen, controleert u of de waarden juist zijn voor deze opties.
Bovendien ziet u deze fout mogelijk als u de standaardconfiguratieopties wijzigt die in het voorbeeld zijn opgegeven (dat u bent gevraagd om niet te wijzigen), zoals
kafka.security.protocol
.Er worden geen records geretourneerd
Als u uw DataFrame probeert weer te geven of te verwerken, maar geen resultaten krijgt, ziet u het volgende in de gebruikersinterface.
Dit bericht betekent dat de verificatie is geslaagd, maar EventHubs geen gegevens heeft geretourneerd. Een aantal mogelijke (hoewel niet volledig) redenen zijn:
- U hebt het verkeerde EventHubs-onderwerp opgegeven.
- De standaardoptie voor
startingOffsets
kafka-configuratie islatest
en u ontvangt momenteel nog geen gegevens via het onderwerp. U kunt instellenstartingOffsetstoearliest
dat u begint met het lezen van gegevens vanaf de vroegste offsets van Kafka.