Opties

Op deze pagina worden configuratieopties beschreven voor het lezen van en schrijven naar Apache Kafka met behulp van Structured Streaming in Azure Databricks.

De Azure Databricks Kafka-connector is gebouwd op de Apache Spark Kafka-connector en biedt ondersteuning voor alle standaardconfiguratieopties voor Kafka. Elke optie waaraan kafka. wordt voorafgegaan, wordt rechtstreeks doorgegeven aan de onderliggende Kafka-client. Stelt bijvoorbeeld .option("kafka.max.poll.records", "500") de eigenschap van max.poll.records de Kafka-consument in. Raadpleeg de kafka-configuratiedocumentatie voor de volledige lijst met beschikbare Kafka-eigenschappen.

Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.

Vereiste opties

Zie Kafka voor meer informatie over de vereiste opties.

De volgende optie is vereist voor zowel lezen als schrijven:

Sleutel Beschrijving
kafka.bootstrap.servers Een door komma's gescheiden lijst met host:poortadressen voor Kafka-brokers. Hiermee stelt u de eigenschap van bootstrap.servers de Kafka-client in.
Als u merkt dat er geen gegevens uit Kafka zijn, controleert u deze lijst met brokeradressen op onjuiste adressen. Als de adreslijst van de broker onjuist is, zijn er mogelijk geen fouten. Kafka-clients gaan ervan uit dat de brokers uiteindelijk beschikbaar zijn en het voor altijd opnieuw proberen wanneer ze netwerkfouten ontvangen.

Voor kafka-leesbewerkingen moet u ook precies een van de volgende opties opgeven om te bepalen welke onderwerpen u wilt gebruiken:

  • subscribe
  • subscribePattern
  • assign

Wanneer u naar Kafka schrijft, kunt u desgewenst de topic optie instellen om een doelonderwerp voor alle rijen op te geven. Als dit niet is ingesteld, moet het DataFrame een topic kolom bevatten.

Algemene opties voor lezers

De volgende opties worden vaak gebruikt bij het lezen vanuit Kafka:

Sleutel Beschrijving
minPartitions Het minimale aantal partities dat moet worden gelezen uit Kafka.
maxRecordsPerPartition Het maximum aantal records per Spark-partitie.
failOnDataLoss Of de query mislukt wanneer het mogelijk is dat de gegevens verloren zijn gegaan.
maxOffsetsPerTrigger Het maximum aantal offsets dat per triggerinterval wordt verwerkt.
startingOffsets De verschuiving waaruit de query begint met lezen.
endingOffsets Waar kunt u stoppen met lezen voor batchquery's.
groupIdPrefix Het aangepaste voorvoegsel voor de automatisch gegenereerde consumentengroep-id.
kafka.group.id De groeps-id die moet worden gebruikt tijdens het lezen vanuit Kafka.
Gebruik dit met voorzichtigheid omdat dit onverwacht gedrag kan veroorzaken. Standaard genereert elke query een unieke groeps-id voor het lezen van gegevens. Dit zorgt ervoor dat elke query een eigen consumentengroep heeft die interferentie van andere consumenten voorkomt en dat elke query alle partities van de geabonneerde onderwerpen kan lezen. In sommige scenario's, zoals verificatie op basis van een Kafka-groep, kunt u specifieke geautoriseerde groeps-id's gebruiken om gegevens te lezen.
Query's met dezelfde groeps-id kunnen elkaar beïnvloeden en alleen gedeeltelijke gegevens lezen. Interferentie kan optreden wanneer u gelijktijdige batch- en streamingworkloads uitvoert, of wanneer u query's snel achter elkaar start en opnieuw start.
Als u problemen wilt minimaliseren, stelt u de Kafka-consumentenconfiguratie session.timeout.ms zo klein mogelijk in.
includeHeaders Of u Kafka-berichtkoppen wilt opnemen in de uitvoer.
bytesEstimateWindowLength Het tijdvenster dat wordt gebruikt om de resterende bytes te schatten via de estimatedTotalBytesBehindLatest metrische waarde.

Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.

Algemene opties voor schrijver

De volgende opties worden vaak gebruikt bij het schrijven naar Kafka:

Sleutel Beschrijving
topic Hiermee stelt u het onderwerp in voor alle rijen. Dit heeft voorrang op elke topic kolom in de gegevens.
includeHeaders Of kafka-headers in de rij moeten worden opgenomen.

Belangrijk

Databricks Runtime 13.3 LTS en hoger bevat een nieuwere versie van de kafka-clients bibliotheek waarmee idempotente schrijfbewerkingen standaard worden ingeschakeld. Als uw Kafka-sink versie 2.8.0 of lager gebruikt met ACL's die zijn geconfigureerd maar zonder IDEMPOTENT_WRITE ingeschakeld, mislukken schrijfbewerkingen. Los dit op door een upgrade uit te voeren naar Kafka 2.8.0 of hoger, of door de instelling in te stellen .option("kafka.enable.idempotence", "false").

Zie kafka en de integratiehandleiding structured streaming + Kafka voor een volledige lijst met opties voor gestructureerde streaming en sinks.

Verificatieopties

Azure Databricks ondersteunt meerdere verificatiemethoden voor Kafka, waaronder servicereferenties voor Unity Catalog, SASL/SSL en cloudspecifieke opties voor AWS MSK, Azure Event Hubs en Google Cloud Managed Kafka.

Azure Databricks raadt u aan om referenties van de Unity Catalog-service te gebruiken voor verificatie bij door de cloud beheerde Kafka-services:

Option Beschrijving
databricks.serviceCredential De naam van een Unity Catalog-servicereferentie voor verificatie bij door de cloud beheerde Kafka-services (AWS MSK, Azure Event Hubs of Google Cloud Managed Kafka). Beschikbaar in Databricks Runtime 16.1 en hoger.
databricks.serviceCredential.scope Het OAuth-bereik voor de servicereferentie. Stel dit alleen in als Azure Databricks het bereik voor uw Kafka-service niet automatisch kan afleiden.

Wanneer u een Unity Catalog-servicereferentie gebruikt, hoeft u geen SASL-/SSL-opties op te geven, zoals kafka.sasl.mechanism, kafka.sasl.jaas.configof kafka.security.protocol.

Algemene SASL-/SSL-opties zijn:

Option Beschrijving
kafka.security.protocol Het protocol dat wordt gebruikt om te communiceren met brokers (bijvoorbeeld SASL_SSL, SSL). PLAINTEXT
kafka.sasl.mechanism Het SASL-mechanisme (bijvoorbeeld PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER). AWS_MSK_IAM
kafka.sasl.jaas.config De configuratietekenreeks voor JAAS-aanmelding.
kafka.sasl.login.callback.handler.class De volledig gekwalificeerde klassenaam van een callback-handler voor aanmelding voor SASL-verificatie.
kafka.sasl.client.callback.handler.class De volledig gekwalificeerde klassenaam van een client callback-handler voor SASL-verificatie.
kafka.ssl.truststore.location De locatie van het SSL-vertrouwensarchiefbestand.
kafka.ssl.truststore.password Het wachtwoord voor het SSL-vertrouwensarchiefbestand.
kafka.ssl.keystore.location De locatie van het SSL-sleutelarchiefbestand.
kafka.ssl.keystore.password Het wachtwoord voor het SSL-sleutelarchiefbestand.

Zie Verificatie voor volledige instructies voor het instellen van verificatie.

Aanvullende bronnen