Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Op deze pagina worden configuratieopties beschreven voor het lezen van en schrijven naar Apache Kafka met behulp van Structured Streaming in Azure Databricks.
De Azure Databricks Kafka-connector is gebouwd op de Apache Spark Kafka-connector en biedt ondersteuning voor alle standaardconfiguratieopties voor Kafka. Elke optie waaraan kafka. wordt voorafgegaan, wordt rechtstreeks doorgegeven aan de onderliggende Kafka-client. Stelt bijvoorbeeld .option("kafka.max.poll.records", "500") de eigenschap van max.poll.records de Kafka-consument in. Raadpleeg de kafka-configuratiedocumentatie voor de volledige lijst met beschikbare Kafka-eigenschappen.
Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.
Vereiste opties
Zie Kafka voor meer informatie over de vereiste opties.
De volgende optie is vereist voor zowel lezen als schrijven:
| Sleutel | Beschrijving |
|---|---|
kafka.bootstrap.servers |
Een door komma's gescheiden lijst met host:poortadressen voor Kafka-brokers. Hiermee stelt u de eigenschap van bootstrap.servers de Kafka-client in.Als u merkt dat er geen gegevens uit Kafka zijn, controleert u deze lijst met brokeradressen op onjuiste adressen. Als de adreslijst van de broker onjuist is, zijn er mogelijk geen fouten. Kafka-clients gaan ervan uit dat de brokers uiteindelijk beschikbaar zijn en het voor altijd opnieuw proberen wanneer ze netwerkfouten ontvangen. |
Voor kafka-leesbewerkingen moet u ook precies een van de volgende opties opgeven om te bepalen welke onderwerpen u wilt gebruiken:
subscribesubscribePatternassign
Wanneer u naar Kafka schrijft, kunt u desgewenst de topic optie instellen om een doelonderwerp voor alle rijen op te geven. Als dit niet is ingesteld, moet het DataFrame een topic kolom bevatten.
Algemene opties voor lezers
De volgende opties worden vaak gebruikt bij het lezen vanuit Kafka:
| Sleutel | Beschrijving |
|---|---|
minPartitions |
Het minimale aantal partities dat moet worden gelezen uit Kafka. |
maxRecordsPerPartition |
Het maximum aantal records per Spark-partitie. |
failOnDataLoss |
Of de query mislukt wanneer het mogelijk is dat de gegevens verloren zijn gegaan. |
maxOffsetsPerTrigger |
Het maximum aantal offsets dat per triggerinterval wordt verwerkt. |
startingOffsets |
De verschuiving waaruit de query begint met lezen. |
endingOffsets |
Waar kunt u stoppen met lezen voor batchquery's. |
groupIdPrefix |
Het aangepaste voorvoegsel voor de automatisch gegenereerde consumentengroep-id. |
kafka.group.id |
De groeps-id die moet worden gebruikt tijdens het lezen vanuit Kafka. Gebruik dit met voorzichtigheid omdat dit onverwacht gedrag kan veroorzaken. Standaard genereert elke query een unieke groeps-id voor het lezen van gegevens. Dit zorgt ervoor dat elke query een eigen consumentengroep heeft die interferentie van andere consumenten voorkomt en dat elke query alle partities van de geabonneerde onderwerpen kan lezen. In sommige scenario's, zoals verificatie op basis van een Kafka-groep, kunt u specifieke geautoriseerde groeps-id's gebruiken om gegevens te lezen. Query's met dezelfde groeps-id kunnen elkaar beïnvloeden en alleen gedeeltelijke gegevens lezen. Interferentie kan optreden wanneer u gelijktijdige batch- en streamingworkloads uitvoert, of wanneer u query's snel achter elkaar start en opnieuw start. Als u problemen wilt minimaliseren, stelt u de Kafka-consumentenconfiguratie session.timeout.ms zo klein mogelijk in. |
includeHeaders |
Of u Kafka-berichtkoppen wilt opnemen in de uitvoer. |
bytesEstimateWindowLength |
Het tijdvenster dat wordt gebruikt om de resterende bytes te schatten via de estimatedTotalBytesBehindLatest metrische waarde. |
Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.
Algemene opties voor schrijver
De volgende opties worden vaak gebruikt bij het schrijven naar Kafka:
| Sleutel | Beschrijving |
|---|---|
topic |
Hiermee stelt u het onderwerp in voor alle rijen. Dit heeft voorrang op elke topic kolom in de gegevens. |
includeHeaders |
Of kafka-headers in de rij moeten worden opgenomen. |
Belangrijk
Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als uw Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukken schrijfbewerkingen. Los dit op door een upgrade uit te voeren naar Kafka 2.8.0 of hoger, of door de instelling in te stellen .option("kafka.enable.idempotence", "false").
Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.
Verificatieopties
Azure Databricks ondersteunt meerdere verificatiemethoden voor Kafka, waaronder servicereferenties voor Unity Catalog, SASL/SSL en cloudspecifieke opties voor AWS MSK, Azure Event Hubs en Google Cloud Managed Kafka.
Azure Databricks raadt u aan om referenties van de Unity Catalog-service te gebruiken voor verificatie bij door de cloud beheerde Kafka-services:
| Option | Beschrijving |
|---|---|
databricks.serviceCredential |
De naam van een Unity Catalog-servicereferentie voor verificatie bij door de cloud beheerde Kafka-services (AWS MSK, Azure Event Hubs of Google Cloud Managed Kafka). Beschikbaar in Databricks Runtime 16.1 en hoger. |
databricks.serviceCredential.scope |
Het OAuth-bereik voor de servicereferentie. Stel dit alleen in als Azure Databricks het bereik voor uw Kafka-service niet automatisch kan afleiden. |
Wanneer u een Unity Catalog-servicereferentie gebruikt, hoeft u geen SASL-/SSL-opties op te geven, zoals kafka.sasl.mechanism, kafka.sasl.jaas.configof kafka.security.protocol.
Algemene SASL-/SSL-opties zijn:
| Option | Beschrijving |
|---|---|
kafka.security.protocol |
Het protocol dat wordt gebruikt om te communiceren met brokers (bijvoorbeeld SASL_SSL, SSL). PLAINTEXT |
kafka.sasl.mechanism |
Het SASL-mechanisme (bijvoorbeeld PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER). AWS_MSK_IAM |
kafka.sasl.jaas.config |
De configuratietekenreeks voor JAAS-aanmelding. |
kafka.sasl.login.callback.handler.class |
De volledig gekwalificeerde klassenaam van een callback-handler voor aanmelding voor SASL-verificatie. |
kafka.sasl.client.callback.handler.class |
De volledig gekwalificeerde klassenaam van een client callback-handler voor SASL-verificatie. |
kafka.ssl.truststore.location |
De locatie van het SSL-vertrouwensarchiefbestand. |
kafka.ssl.truststore.password |
Het wachtwoord voor het SSL-vertrouwensarchiefbestand. |
kafka.ssl.keystore.location |
De locatie van het SSL-sleutelarchiefbestand. |
kafka.ssl.keystore.password |
Het wachtwoord voor het SSL-sleutelarchiefbestand. |
Zie Verificatie voor volledige instructies voor het instellen van verificatie.
Aanvullende bronnen
- Integratiehandleiding voor Structured Streaming + Kafka (Documentatie voor Apache Spark)
- Apache Kafka-configuraties