Options

Auf dieser Seite werden Konfigurationsoptionen zum Lesen und Schreiben in Apache Kafka mithilfe von Structured Streaming auf Azure Databricks beschrieben.

Der Azure Databricks Kafka Connector basiert auf dem Apache Spark Kafka Connector und unterstützt alle standardmäßigen Kafka-Konfigurationsoptionen. Jede Option, die dem zugrunde liegenden Kafka-Client vorangestellt kafka. ist, wird direkt an den zugrunde liegenden Kafka-Client übergeben. Legt beispielsweise .option("kafka.max.poll.records", "500") die Eigenschaft des Kafka-Verbrauchers max.poll.records fest. Die vollständige Liste der verfügbaren Kafka-Eigenschaften finden Sie in der Kafka-Konfigurationsdokumentation .

Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.

Erforderliche Optionen

Ausführliche Informationen zu den erforderlichen Optionen finden Sie unter Kafka.

Die folgende Option ist sowohl für das Lesen als auch für das Schreiben erforderlich:

Schlüssel Beschreibung
kafka.bootstrap.servers Eine durch Trennzeichen getrennte Liste der Host:Port-Adressen für Kafka-Broker. Legt die Eigenschaft des bootstrap.servers Kafka-Clients fest.
Wenn Sie feststellen, dass keine Daten aus Kafka vorhanden sind, überprüfen Sie diese Brokeradressenliste auf falsche Adressen. Wenn die Brokeradressliste falsch ist, treten möglicherweise keine Fehler auf. Kafka-Clients gehen davon aus, dass die Broker irgendwann verfügbar sind, und versuchen Sie es für immer, wenn sie Netzwerkfehler erhalten.

Für Kafka-Lesevorgänge müssen Sie auch genau eine der folgenden Optionen angeben, um zu identifizieren, welche Themen verwendet werden sollen:

  • subscribe
  • subscribePattern
  • assign

Beim Schreiben in Kafka können Sie optional die topic Option festlegen, um ein Zielthema für alle Zeilen anzugeben. Wenn sie nicht festgelegt ist, muss der DataFrame eine topic Spalte enthalten.

Allgemeine Leseoptionen

Die folgenden Optionen werden häufig beim Lesen von Kafka verwendet:

Schlüssel Beschreibung
minPartitions Die Mindestanzahl der Partitionen, die aus Kafka gelesen werden sollen.
maxRecordsPerPartition Die maximale Anzahl von Datensätzen pro Spark-Partition.
failOnDataLoss Gibt an, ob die Abfrage fehlschlägt, wenn die Daten verloren gingen.
maxOffsetsPerTrigger Die maximale Anzahl von Offsets, die pro Triggerintervall verarbeitet werden.
startingOffsets Der Offset, von dem die Abfrage mit dem Lesen beginnt.
endingOffsets Wo kann das Lesen für Batchabfragen beendet werden.
groupIdPrefix Das angepasste Präfix für die automatisch generierte Consumergruppen-ID.
kafka.group.id Die beim Lesen von Kafka zu verwendende Gruppen-ID.
Verwenden Sie dies mit Vorsicht, da es zu unerwartetem Verhalten führen kann. Standardmäßig generiert jede Abfrage eine eindeutige Gruppen-ID zum Lesen von Daten. Dadurch wird sichergestellt, dass jede Abfrage über eine eigene Verbrauchergruppe verfügt, die Störungen von anderen Verbrauchern verhindert, und jede Abfrage kann alle Partitionen der abonnierten Themen lesen. In einigen Szenarien, z. B. die gruppenbasierte Kafka-Autorisierung, können Sie bestimmte autorisierte Gruppen-IDs verwenden, um Daten zu lesen.
Abfragen mit derselben Gruppen-ID können sich gegenseitig beeinträchtigen und nur Teildaten lesen. Störungen können auftreten, wenn Sie gleichzeitige Batch- und Streamingworkloads ausführen oder Abfragen in schneller Folge starten und neu starten.
Um Probleme zu minimieren, legen Sie die Kafka-Consumerkonfiguration session.timeout.ms so fest, dass sie sehr klein ist.
includeHeaders Gibt an, ob Kafka-Nachrichtenkopfzeilen in die Ausgabe eingeschlossen werden sollen.
bytesEstimateWindowLength Das Zeitfenster, das verwendet wird, um verbleibende Bytes über die estimatedTotalBytesBehindLatest Metrik zu schätzen.

Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.

Allgemeine Schreiboptionen

Die folgenden Optionen werden häufig beim Schreiben in Kafka verwendet:

Schlüssel Beschreibung
topic Legt das Thema für alle Zeilen fest. Dies hat Vorrang vor jeder topic Spalte in den Daten.
includeHeaders Gibt an, ob Kafka-Kopfzeilen in die Zeile eingeschlossen werden sollen.

Von Bedeutung

In Databricks Runtime 13.3 LTS und höher ist eine neuere Version der kafka-clients-Bibliothek enthalten, die standardmäßig idempotente Schreibvorgänge ermöglicht. Wenn ihre Kafka-Spüle Version 2.8.0 oder darunter mit konfigurierten ACLs verwendet, aber ohne IDEMPOTENT_WRITE aktivierte Schreibvorgänge fehlschlägt. Beheben Sie dies, indem Sie ein Upgrade auf Kafka 2.8.0 oder höher oder durch Festlegen durchführen .option("kafka.enable.idempotence", "false").

Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.

Authentifizierungsoptionen

Azure Databricks unterstützt mehrere Authentifizierungsmethoden für Kafka, einschließlich Unity Catalog-Dienstanmeldeinformationen, SASL/SSL und cloudspezifische Optionen für AWS MSK, Azure Event Hubs und Google Cloud Managed Kafka.

Azure Databricks empfiehlt die Verwendung von Unity Catalog-Dienstanmeldeinformationen für die Authentifizierung bei cloudverwalteten Kafka-Diensten:

Auswahl Beschreibung
databricks.serviceCredential Der Name einer Unity-Katalogdienstanmeldeinformationen für die Authentifizierung bei cloudverwalteten Kafka-Diensten (AWS MSK, Azure Event Hubs oder Google Cloud Managed Kafka). Verfügbar in Databricks Runtime 16.1 und höher.
databricks.serviceCredential.scope Der OAuth-Bereich für die Dienstanmeldeinformationen. Legen Sie dies nur fest, wenn Azure Databricks den Umfang Ihres Kafka-Diensts nicht automatisch ableiten kann.

Wenn Sie eine Unity Catalog-Dienstanmeldeinformationen verwenden, müssen Sie keine SASL/SSL-Optionen wie kafka.sasl.mechanism, oder kafka.sasl.jaas.configkafka.security.protocol.

Allgemeine SASL/SSL-Optionen umfassen:

Auswahl Beschreibung
kafka.security.protocol Das Protokoll, das für die Kommunikation mit Brokern verwendet wird (z. BSASL_SSL. , , SSLPLAINTEXT).
kafka.sasl.mechanism Der SASL-Mechanismus (zPLAIN. B. , , SCRAM-SHA-256SCRAM-SHA-512, , ). AWS_MSK_IAMOAUTHBEARER
kafka.sasl.jaas.config Die JAAS-Anmeldekonfigurationszeichenfolge.
kafka.sasl.login.callback.handler.class Der vollqualifizierte Klassenname eines Anmelderückrufhandlers für die SASL-Authentifizierung.
kafka.sasl.client.callback.handler.class Der vollqualifizierte Klassenname eines Clientrückrufhandlers für die SASL-Authentifizierung.
kafka.ssl.truststore.location Der Speicherort der SSL-Vertrauensspeicherdatei.
kafka.ssl.truststore.password Das Kennwort für die SSL-Vertrauensspeicherdatei.
kafka.ssl.keystore.location Der Speicherort der SSL-Schlüsselspeicherdatei.
kafka.ssl.keystore.password Das Kennwort für die SSL-Schlüsselspeicherdatei.

Vollständige Anweisungen zum Einrichten der Authentifizierung finden Sie unter "Authentifizierung".

Weitere Ressourcen