Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite werden Konfigurationsoptionen zum Lesen und Schreiben in Apache Kafka mithilfe von Structured Streaming auf Azure Databricks beschrieben.
Der Azure Databricks Kafka Connector basiert auf dem Apache Spark Kafka Connector und unterstützt alle standardmäßigen Kafka-Konfigurationsoptionen. Jede Option, die dem zugrunde liegenden Kafka-Client vorangestellt kafka. ist, wird direkt an den zugrunde liegenden Kafka-Client übergeben. Legt beispielsweise .option("kafka.max.poll.records", "500") die Eigenschaft des Kafka-Verbrauchers max.poll.records fest. Die vollständige Liste der verfügbaren Kafka-Eigenschaften finden Sie in der Kafka-Konfigurationsdokumentation .
Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.
Erforderliche Optionen
Ausführliche Informationen zu den erforderlichen Optionen finden Sie unter Kafka.
Die folgende Option ist sowohl für das Lesen als auch für das Schreiben erforderlich:
| Schlüssel | Beschreibung |
|---|---|
kafka.bootstrap.servers |
Eine durch Trennzeichen getrennte Liste der Host:Port-Adressen für Kafka-Broker. Legt die Eigenschaft des bootstrap.servers Kafka-Clients fest.Wenn Sie feststellen, dass keine Daten aus Kafka vorhanden sind, überprüfen Sie diese Brokeradressenliste auf falsche Adressen. Wenn die Brokeradressliste falsch ist, treten möglicherweise keine Fehler auf. Kafka-Clients gehen davon aus, dass die Broker irgendwann verfügbar sind, und versuchen Sie es für immer, wenn sie Netzwerkfehler erhalten. |
Für Kafka-Lesevorgänge müssen Sie auch genau eine der folgenden Optionen angeben, um zu identifizieren, welche Themen verwendet werden sollen:
subscribesubscribePatternassign
Beim Schreiben in Kafka können Sie optional die topic Option festlegen, um ein Zielthema für alle Zeilen anzugeben. Wenn sie nicht festgelegt ist, muss der DataFrame eine topic Spalte enthalten.
Allgemeine Leseoptionen
Die folgenden Optionen werden häufig beim Lesen von Kafka verwendet:
| Schlüssel | Beschreibung |
|---|---|
minPartitions |
Die Mindestanzahl der Partitionen, die aus Kafka gelesen werden sollen. |
maxRecordsPerPartition |
Die maximale Anzahl von Datensätzen pro Spark-Partition. |
failOnDataLoss |
Gibt an, ob die Abfrage fehlschlägt, wenn die Daten verloren gingen. |
maxOffsetsPerTrigger |
Die maximale Anzahl von Offsets, die pro Triggerintervall verarbeitet werden. |
startingOffsets |
Der Offset, von dem die Abfrage mit dem Lesen beginnt. |
endingOffsets |
Wo kann das Lesen für Batchabfragen beendet werden. |
groupIdPrefix |
Das angepasste Präfix für die automatisch generierte Consumergruppen-ID. |
kafka.group.id |
Die beim Lesen von Kafka zu verwendende Gruppen-ID. Verwenden Sie dies mit Vorsicht, da es zu unerwartetem Verhalten führen kann. Standardmäßig generiert jede Abfrage eine eindeutige Gruppen-ID zum Lesen von Daten. Dadurch wird sichergestellt, dass jede Abfrage über eine eigene Verbrauchergruppe verfügt, die Störungen von anderen Verbrauchern verhindert, und jede Abfrage kann alle Partitionen der abonnierten Themen lesen. In einigen Szenarien, z. B. die gruppenbasierte Kafka-Autorisierung, können Sie bestimmte autorisierte Gruppen-IDs verwenden, um Daten zu lesen. Abfragen mit derselben Gruppen-ID können sich gegenseitig beeinträchtigen und nur Teildaten lesen. Störungen können auftreten, wenn Sie gleichzeitige Batch- und Streamingworkloads ausführen oder Abfragen in schneller Folge starten und neu starten. Um Probleme zu minimieren, legen Sie die Kafka-Consumerkonfiguration session.timeout.ms so fest, dass sie sehr klein ist. |
includeHeaders |
Gibt an, ob Kafka-Nachrichtenkopfzeilen in die Ausgabe eingeschlossen werden sollen. |
bytesEstimateWindowLength |
Das Zeitfenster, das verwendet wird, um verbleibende Bytes über die estimatedTotalBytesBehindLatest Metrik zu schätzen. |
Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.
Allgemeine Schreiboptionen
Die folgenden Optionen werden häufig beim Schreiben in Kafka verwendet:
| Schlüssel | Beschreibung |
|---|---|
topic |
Legt das Thema für alle Zeilen fest. Dies hat Vorrang vor jeder topic Spalte in den Daten. |
includeHeaders |
Gibt an, ob Kafka-Kopfzeilen in die Zeile eingeschlossen werden sollen. |
Von Bedeutung
In Databricks Runtime 13.3 LTS und höher ist eine neuere Version der kafka-clients-Bibliothek enthalten, die standardmäßig idempotente Schreibvorgänge ermöglicht. Wenn ihre Kafka-Spüle Version 2.8.0 oder darunter mit konfigurierten ACLs verwendet, aber ohne IDEMPOTENT_WRITE aktivierte Schreibvorgänge fehlschlägt. Beheben Sie dies, indem Sie ein Upgrade auf Kafka 2.8.0 oder höher oder durch Festlegen durchführen .option("kafka.enable.idempotence", "false").
Eine vollständige Liste der Optionen für strukturiertes Streaming und senken Sie unter Kafka und dem Structured Streaming + Kafka Integration Guide.
Authentifizierungsoptionen
Azure Databricks unterstützt mehrere Authentifizierungsmethoden für Kafka, einschließlich Unity Catalog-Dienstanmeldeinformationen, SASL/SSL und cloudspezifische Optionen für AWS MSK, Azure Event Hubs und Google Cloud Managed Kafka.
Azure Databricks empfiehlt die Verwendung von Unity Catalog-Dienstanmeldeinformationen für die Authentifizierung bei cloudverwalteten Kafka-Diensten:
| Auswahl | Beschreibung |
|---|---|
databricks.serviceCredential |
Der Name einer Unity-Katalogdienstanmeldeinformationen für die Authentifizierung bei cloudverwalteten Kafka-Diensten (AWS MSK, Azure Event Hubs oder Google Cloud Managed Kafka). Verfügbar in Databricks Runtime 16.1 und höher. |
databricks.serviceCredential.scope |
Der OAuth-Bereich für die Dienstanmeldeinformationen. Legen Sie dies nur fest, wenn Azure Databricks den Umfang Ihres Kafka-Diensts nicht automatisch ableiten kann. |
Wenn Sie eine Unity Catalog-Dienstanmeldeinformationen verwenden, müssen Sie keine SASL/SSL-Optionen wie kafka.sasl.mechanism, oder kafka.sasl.jaas.configkafka.security.protocol.
Allgemeine SASL/SSL-Optionen umfassen:
| Auswahl | Beschreibung |
|---|---|
kafka.security.protocol |
Das Protokoll, das für die Kommunikation mit Brokern verwendet wird (z. BSASL_SSL. , , SSLPLAINTEXT). |
kafka.sasl.mechanism |
Der SASL-Mechanismus (zPLAIN. B. , , SCRAM-SHA-256SCRAM-SHA-512, , ). AWS_MSK_IAMOAUTHBEARER |
kafka.sasl.jaas.config |
Die JAAS-Anmeldekonfigurationszeichenfolge. |
kafka.sasl.login.callback.handler.class |
Der vollqualifizierte Klassenname eines Anmelderückrufhandlers für die SASL-Authentifizierung. |
kafka.sasl.client.callback.handler.class |
Der vollqualifizierte Klassenname eines Clientrückrufhandlers für die SASL-Authentifizierung. |
kafka.ssl.truststore.location |
Der Speicherort der SSL-Vertrauensspeicherdatei. |
kafka.ssl.truststore.password |
Das Kennwort für die SSL-Vertrauensspeicherdatei. |
kafka.ssl.keystore.location |
Der Speicherort der SSL-Schlüsselspeicherdatei. |
kafka.ssl.keystore.password |
Das Kennwort für die SSL-Schlüsselspeicherdatei. |
Vollständige Anweisungen zum Einrichten der Authentifizierung finden Sie unter "Authentifizierung".
Weitere Ressourcen
- Structured Streaming + Kafka Integration Guide (Apache Spark-Dokumentation)
- Apache Kafka-Konfigurationen