Delen via


Streamverwerking met Apache Kafka en Azure Databricks

In dit artikel wordt beschreven hoe u Apache Kafka kunt gebruiken als bron of sink bij het uitvoeren van structured streaming-workloads in Azure Databricks.

Zie de Kafka-documentatie voor meer Kafka.

Gegevens lezen uit Kafka

Hier volgt een voorbeeld voor een streaming-leesbewerking uit Kafka:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks biedt ook ondersteuning voor semantiek voor batchleesbewerkingen voor Kafka-gegevensbronnen, zoals wordt weergegeven in het volgende voorbeeld:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

Voor het laden van incrementele batches raadt Databricks het gebruik van Kafka aan met Trigger.AvailableNow. Zie Incrementele batchverwerking configureren.

In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks een SQL-functie voor het lezen van Kafka-gegevens. Streamen met SQL wordt alleen ondersteund in Delta Live Tables of met streamingtabellen in Databricks SQL. Zie read_kafka functie met tabelwaarde.

Kafka Structured Streaming-lezer configureren

Azure Databricks biedt het trefwoord als een gegevensindeling voor het kafka configureren van verbindingen met Kafka 0.10+.

Hier volgen de meest voorkomende configuraties voor Kafka:

Er zijn meerdere manieren waarop u kunt opgeven op welke onderwerpen u zich wilt abonneren. U moet slechts een van deze parameters opgeven:

Optie Weergegeven als Beschrijving
abonneren Een door komma's gescheiden lijst met onderwerpen. De lijst met onderwerpen waarop u zich wilt abonneren.
subscribePattern Java regex-tekenreeks. Het patroon dat wordt gebruikt om u te abonneren op een of meer onderwerpen.
toewijzen JSON-tekenreeks {"topicA":[0,1],"topic":[2,4]}. Specifieke topicPartitions die moeten worden gebruikt.

Andere belangrijke configuraties:

Optie Weergegeven als Standaardwaarde Beschrijving
kafka.bootstrap.servers Door komma's gescheiden lijst met host:poort. empty [Vereist] De Kafka-configuratie bootstrap.servers . Als u merkt dat er geen gegevens uit Kafka zijn, controleert u eerst de adreslijst van de broker. Als de adreslijst van de broker onjuist is, zijn er mogelijk geen fouten. Dit komt doordat de Kafka-client ervan uitgaat dat de brokers uiteindelijk beschikbaar komen en in het geval van netwerkfouten voor altijd opnieuw proberen.
failOnDataLoss true of false. true [Optioneel] Of de query mislukt wanneer het mogelijk is dat de gegevens verloren zijn gegaan. Query's kunnen gegevens uit Kafka permanent niet lezen vanwege veel scenario's, zoals verwijderde onderwerpen, afkapping van onderwerpen voordat ze worden verwerkt, enzovoort. We proberen conservatief te schatten of gegevens mogelijk verloren zijn gegaan of niet. Soms kan dit valse waarschuwingen veroorzaken. Stel deze optie in op false als deze niet werkt zoals verwacht of als u wilt dat de query blijft verwerken ondanks gegevensverlies.
minPartitions Geheel getal >= 0, 0 = uitgeschakeld. 0 (uitgeschakeld) [Optioneel] Minimaal aantal partities dat moet worden gelezen uit Kafka. U kunt Spark configureren voor het gebruik van een willekeurig minimum aan partities om vanuit Kafka te lezen met behulp van de minPartitions optie. Normaal gesproken heeft Spark een 1-1 toewijzing van Kafka topicPartitions aan Spark-partities die van Kafka worden gebruikt. Als u de minPartitions optie instelt op een waarde die groter is dan uw Kafka topicPartitions, zal Spark grote Kafka-partities op kleinere delen verdelen. Deze optie kan worden ingesteld op momenten van piekbelastingen, scheeftrekken van gegevens en naarmate uw stroom achterloopt om de verwerkingssnelheid te verhogen. Het komt ten koste van het initialiseren van Kafka-consumenten bij elke trigger, wat de prestaties kan beïnvloeden als u SSL gebruikt bij het maken van verbinding met Kafka.
kafka.group.id Een Kafka-consumentengroep-id. niet ingesteld [Optioneel] Groeps-id die moet worden gebruikt tijdens het lezen vanuit Kafka. Gebruik dit met voorzichtigheid. Standaard genereert elke query een unieke groeps-id voor het lezen van gegevens. Dit zorgt ervoor dat elke query een eigen consumentengroep heeft die geen interferentie van een andere consument ondervindt en daarom alle partities van de geabonneerde onderwerpen kan lezen. In sommige scenario's (bijvoorbeeld autorisatie op basis van kafka-groepen), kunt u specifieke geautoriseerde groeps-id's gebruiken om gegevens te lezen. U kunt desgewenst de groeps-id instellen. Doe dit echter met extreme voorzichtigheid, omdat dit onverwacht gedrag kan veroorzaken.

- Gelijktijdig uitgevoerde query's (zowel batch- als streaming) met dezelfde groeps-id beïnvloeden waarschijnlijk elkaar, waardoor elke query slechts een deel van de gegevens kan lezen.
- Dit kan ook gebeuren wanneer query's snel na elkaar worden gestart/opnieuw gestart. Als u dergelijke problemen wilt minimaliseren, stelt u de Kafka-consumentenconfiguratie session.timeout.ms in op zeer klein.
startingOffsets vroegste, meest recente nieuwste [Optioneel] Het beginpunt waarop een query wordt gestart, ofwel 'vroegst' die afkomstig is van de vroegste verschuivingen, of een json-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de json kan -2 als een offset worden gebruikt om te verwijzen naar vroegste, -1 naar meest recente. Opmerking: Voor batchquery's is de meest recente (impliciet of met behulp van -1 in json) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart en dat hervatten altijd wordt opgehaald van waar de query was gebleven. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste.

Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.

Schema voor Kafka-records

Het schema van Kafka-records is:

Column Type
sleutel binair
waarde binair
onderwerp tekenreeks
partitie int
offset long
timestamp long
timestampType int

De key en de value worden altijd gedeserialiseerd als bytematrices met de ByteArrayDeserializer. Gebruik DataFrame-bewerkingen (zoals cast("string")) om de sleutels en waarden expliciet deserialiseren.

Gegevens schrijven naar Kafka

Hier volgt een voorbeeld van een streaming-schrijfbewerking naar Kafka:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks ondersteunt ook semantiek voor batch-schrijfbewerkingen naar Kafka-gegevenssinks, zoals wordt weergegeven in het volgende voorbeeld:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Kafka Structured Streaming Writer configureren

Belangrijk

Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als een Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd, maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukt de schrijfbewerking met het foutbericht org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Los deze fout op door een upgrade uit te voeren naar Kafka versie 2.8.0 of hoger, of door de instelling .option(“kafka.enable.idempotence”, “false”) tijdens het configureren van uw Structured Streaming Writer.

Het schema dat aan de DataStreamWriter is verstrekt, communiceert met de Kafka-sink. U kunt de volgende velden gebruiken:

Kolomnaam Vereist of optioneel Type
key optioneel STRING of BINARY
value vereist STRING of BINARY
headers optioneel ARRAY
topic optioneel (genegeerd als topic is ingesteld als schrijveroptie) STRING
partition optioneel INT

Hier volgen algemene opties die zijn ingesteld tijdens het schrijven naar Kafka:

Optie Weergegeven als Default value Beschrijving
kafka.boostrap.servers Een door komma's gescheiden lijst van <host:port> Geen [Vereist] De Kafka-configuratie bootstrap.servers .
topic STRING niet ingesteld [Optioneel] Hiermee stelt u het onderwerp in voor alle rijen die moeten worden geschreven. Met deze optie worden alle onderwerpkolommen overschreven die in de gegevens aanwezig zijn.
includeHeaders BOOLEAN false [Optioneel] Of u de Kafka-headers in de rij wilt opnemen.

Zie de integratiehandleiding voor Structured Streaming Kafka voor andere optionele configuraties.

Metrische Kafka-gegevens ophalen

U kunt het gemiddelde, het minimum en het maximum van het aantal offsets ophalen dat de streamingquery achter de meest recente beschikbare offset ligt tussen alle geabonneerde onderwerpen met de avgOffsetsBehindLatestmaxOffsetsBehindLatest, en minOffsetsBehindLatest metrische gegevens. Zie Interactief metrische gegevens lezen.

Notitie

Beschikbaar in Databricks Runtime 9.1 en hoger.

Haal het geschatte totale aantal bytes op dat het queryproces niet heeft verbruikt uit de geabonneerde onderwerpen door de waarde van estimatedTotalBytesBehindLatest. Deze schatting is gebaseerd op de batches die in de afgelopen 300 seconden zijn verwerkt. Het tijdsbestek waarop de schatting is gebaseerd, kan worden gewijzigd door de optie bytesEstimateWindowLength in te stellen op een andere waarde. Als u deze bijvoorbeeld wilt instellen op 10 minuten:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

Als u de stream uitvoert in een notebook, ziet u deze metrische gegevens op het tabblad Onbewerkte gegevens in het voortgangsdashboard voor streamingquery's:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

SSL gebruiken om Azure Databricks te verbinden met Kafka

Als u SSL-verbindingen met Kafka wilt inschakelen, volgt u de instructies in de Confluent-documentatie Versleuteling en verificatie met SSL. U kunt de configuraties opgeven die daar worden beschreven, voorafgegaan door kafka., als opties. U geeft bijvoorbeeld de locatie van het vertrouwensarchief op in de eigenschap kafka.ssl.truststore.location.

Databricks raadt u het volgende aan:

In het volgende voorbeeld worden objectopslaglocaties en Databricks-geheimen gebruikt om een SSL-verbinding in te schakelen:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

Kafka in HDInsight verbinden met Azure Databricks

  1. Maak een HDInsight Kafka-cluster.

    Zie Verbinding maken met Kafka in HDInsight via een virtueel Azure-netwerk voor instructies.

  2. Configureer de Kafka-brokers om het juiste adres te adverteren.

    Volg de instructies in Kafka configureren voor IP-reclame. Als u Kafka zelf beheert op virtuele Azure-machines, moet u ervoor zorgen dat de advertised.listeners configuratie van de brokers is ingesteld op het interne IP-adres van de hosts.

  3. Maak een Azure Databricks-cluster.

  4. Koppel het Kafka-cluster aan het Azure Databricks-cluster.

    Volg de instructies in virtuele peernetwerken.

Verificatie van service-principal met Microsoft Entra-id en Azure Event Hubs

Azure Databricks ondersteunt de verificatie van Spark-taken met Event Hubs-services. Deze verificatie wordt uitgevoerd via OAuth met Microsoft Entra-id.

AAD-verificatiediagram

Azure Databricks biedt ondersteuning voor Microsoft Entra ID-verificatie met een client-id en geheim in de volgende rekenomgevingen:

  • Databricks Runtime 12.2 LTS en hoger op rekenkracht die is geconfigureerd met de modus voor toegang van één gebruiker.
  • Databricks Runtime 14.3 LTS en hoger op rekenkracht die is geconfigureerd met de modus voor gedeelde toegang.
  • Delta Live Tables-pijplijnen die zijn geconfigureerd zonder Unity Catalog.

Azure Databricks biedt geen ondersteuning voor Microsoft Entra ID-verificatie met een certificaat in een rekenomgeving of in Delta Live Tables-pijplijnen die zijn geconfigureerd met Unity Catalog.

Deze verificatie werkt niet op gedeelde clusters of in Unity Catalog Delta Live Tables.

De Structured Streaming Kafka-connector configureren

Voor het uitvoeren van verificatie met Microsoft Entra-id hebt u de volgende waarden nodig:

  • Een tenant-id. U vindt dit op het tabblad Services van Microsoft Entra ID .

  • Een clientID (ook wel toepassings-id genoemd).

  • Een clientgeheim. Zodra u dit hebt, moet u het toevoegen als een geheim aan uw Databricks-werkruimte. Zie Geheimbeheer om dit geheim toe te voegen.

  • Een EventHubs-onderwerp. U vindt een lijst met onderwerpen in de sectie Event Hubs onder de sectie Entiteiten op een specifieke Event Hubs-naamruimtepagina . Als u met meerdere onderwerpen wilt werken, kunt u de IAM-rol instellen op Event Hubs-niveau.

  • Een EventHubs-server. U vindt dit op de overzichtspagina van uw specifieke Event Hubs-naamruimte:

    Event Hubs-naamruimte

Daarnaast moeten we Kafka vertellen het OAuth SASL-mechanisme te gebruiken (SASL is een algemeen protocol en OAuth is een type SASL-mechanisme):

  • kafka.security.protocol moet zijn SASL_SSL
  • kafka.sasl.mechanism moet zijn OAUTHBEARER
  • kafka.sasl.login.callback.handler.class moet een volledig gekwalificeerde naam van de Java-klasse zijn met de waarde van kafkashaded de callback-handler voor aanmelding van onze gearceerde Kafka-klasse. Zie het volgende voorbeeld voor de exacte klasse.

Opmerking

Laten we nu eens kijken naar een actief voorbeeld:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

Mogelijke fouten afhandelen

  • Streamingopties worden niet ondersteund.

    Als u dit verificatiemechanisme probeert te gebruiken in een Delta Live Tables-pijplijn die is geconfigureerd met Unity Catalog, krijgt u mogelijk de volgende fout:

    Niet-ondersteunde streamingfout

    Gebruik een ondersteunde rekenconfiguratie om deze fout op te lossen. Zie Service Principal-verificatie met Microsoft Entra ID en Azure Event Hubs.

  • Kan geen nieuwe KafkaAdminClientmaken.

    Dit is een interne fout die Kafka genereert als een van de volgende verificatieopties onjuist is:

    • Client-id (ook wel toepassings-id genoemd)
    • Tenant-id
    • EventHubs-server

    Als u de fout wilt oplossen, controleert u of de waarden juist zijn voor deze opties.

    Bovendien ziet u deze fout mogelijk als u de standaardconfiguratieopties wijzigt die in het voorbeeld zijn opgegeven (dat u bent gevraagd om niet te wijzigen), zoals kafka.security.protocol.

  • Er worden geen records geretourneerd

    Als u uw DataFrame probeert weer te geven of te verwerken, maar geen resultaten krijgt, ziet u het volgende in de gebruikersinterface.

    Geen resultatenbericht

    Dit bericht betekent dat de verificatie is geslaagd, maar EventHubs geen gegevens heeft geretourneerd. Een aantal mogelijke (hoewel niet volledig) redenen zijn:

    • U hebt het verkeerde EventHubs-onderwerp opgegeven.
    • De standaardoptie voor startingOffsets kafka-configuratie is latesten u ontvangt momenteel nog geen gegevens via het onderwerp. U kunt instellen startingOffsetstoearliest dat u begint met het lezen van gegevens vanaf de vroegste offsets van Kafka.