Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
GILT FÜR: NoSQL
Kafka Connect für Azure Cosmos DB ist ein Connector zum Lesen und Schreiben von Daten in Azure Cosmos DB. Mit dem Azure Cosmos DB-Sink-Connector können Sie Daten aus Apache Kafka-Topics in eine Azure Cosmos DB-Datenbank exportieren. Der Connector ruft Daten aus Kafka ab, um – basierend auf dem Themenabonnement – in Container in der Datenbank zu schreiben.
Voraussetzungen
- Beginnen Sie mit dem Confluent Platform-Setup, weil Ihnen diese Plattform eine vollständige Umgebung für Ihre Arbeit bietet. Wenn Sie Confluent Platform nicht verwenden möchten, müssen Sie Apache Kafka und Kafka Connect selbst installieren und konfigurieren. Außerdem müssen Sie die Azure Cosmos DB-Connectors manuell installieren und konfigurieren.
- Erstellen Sie ein Azure Cosmos DB-Konto und sehen Sie sich den Einrichtungsleitfaden für Container an.
- Bash-Shell, die in GitHub Codespaces bzw. unter Mac, Ubuntu und Windows mit WSL2 getestet wird. Diese Shell funktioniert nicht in Cloud Shell oder WSL1.
- Herunterladen von Java 11 oder höher
- Herunterladen von Maven
Installieren des Senkenconnectors
Wenn Sie das empfohlene Confluent Platform-Setup verwenden, ist der Azure Cosmos DB Sink-Connector in der Installation enthalten, und Sie können diesen Schritt überspringen.
Andernfalls können Sie die JAR-Datei aus dem neuesten Release herunterladen oder dieses Repository packen, um eine neue JAR-Datei zu erstellen. Informationen zum manuellen Installieren des Connectors mithilfe der JAR-Datei finden Sie in diesen Anleitungen. Sie können auch eine neue JAR-Datei aus dem Quellcode packen.
# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar
Erstellen eines Kafka-Themas und Schreiben von Daten
Wenn Sie Confluent Platform verwenden, ist die einfachste Möglichkeit zum Erstellen eines Kafka-Themas die Nutzung der bereitgestellten Control Center-Benutzeroberfläche. Andernfalls können Sie ein Kafka-Thema mithilfe der folgenden Syntax manuell erstellen:
./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
In diesem Szenario erstellen Sie ein Kafka-Thema namens „hotels“ und schreiben in das Thema JSON-Daten, die nicht in Schemas eingebettet sind. Informationen zum Erstellen eines Themas in Control Center finden Sie im Confluent-Leitfaden.
Starten Sie als Nächstes den Kafka-Konsolenproducer, um einige Zeilen in das Thema „hotels“ zu schreiben.
# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels
# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels
Geben Sie im Konsolenproducer Folgendes ein:
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
Die drei eingegebenen Datensätze werden im Kafka-Thema „hotels“ im JSON-Format veröffentlicht.
Erstellen des Senkenconnectors
Erstellen Sie einen Azure Cosmos DB-Sink-Connector in Kafka Connect. Der folgende JSON-Text definiert die Konfiguration für den Senkenconnector. Sie müssen die Werte für die Eigenschaften connect.cosmos.connection.endpoint
und connect.cosmos.master.key
ersetzen, die Sie in den Voraussetzungen im Azure Cosmos DB-Setupleitfaden gespeichert haben sollten.
Weitere Informationen zu den einzelnen Konfigurationseigenschaften finden Sie unter Senkeneigenschaften.
{
"name": "cosmosdb-sink-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
"tasks.max": "1",
"topics": [
"hotels"
],
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "hotels#kafka"
}
}
Nachdem Sie alle Werte ausgefüllt haben, speichern Sie die JSON-Datei lokal. Sie können diese Datei zum Erstellen des Connectors mithilfe der REST-API verwenden.
Erstellen eines Connectors mithilfe von Control Center
Eine einfache Möglichkeit zum Erstellen des Connectors bietet die Verwendung der Control Center-Webseite. Folgen Sie diesem Installationsleitfaden, um einen Connector über Control Center zu erstellen. Verwenden Sie statt der Option DatagenConnector
die Kachel CosmosDBSinkConnector
. Geben Sie beim Konfigurieren des Senkenconnectors die Werte so wie in der JSON-Datei ein.
Alternativ können Sie auf der Connectorseite mithilfe der Option Upload connector config file (Konfigurationsdatei für den Connector hochladen) die zuvor erstellte JSON-Datei hochladen.
Erstellen des Connectors mithilfe der REST-API
Erstellen Sie den Senkenconnector mithilfe der Connect-REST-API:
# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors
Bestätigen von Daten, die in Azure Cosmos DB geschrieben wurden
Melden Sie sich am Azure-Portal an, und navigieren Sie zu Ihrem Azure Cosmos DB-Konto. Überprüfen Sie, ob die drei Zeilen aus dem Thema „hotels“ in Ihrem Konto erstellt werden.
Bereinigen
Navigieren Sie zu dem von Ihnen erstellten Senkenconnector, und wählen Sie das Symbol Löschen aus, um den Connector aus dem Control Center zu löschen.
Alternativ können Sie zum Löschen die Connect-REST-API verwenden:
# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector
Führen Sie diese Schritte aus, um den erstellten Azure Cosmos DB-Dienst und die zugehörige Ressourcengruppe über die Azure CLI zu löschen.
Konfigurationseigenschaften des Senkenconnectors
Die folgenden Einstellungen werden zum Einrichten eines Azure Cosmos DB Kafka Sink-Connectors verwendet. Mit diesen Konfigurationswerten wird festgelegt, welche Kafka-Themendaten verarbeitet werden, in welche Azure Cosmos DB-Container Daten geschrieben werden und welche Formate zum Serialisieren der Daten verwendet werden. Ein Beispiel für eine Konfigurationsdatei mit den Standardwerten finden Sie in dieser Konfiguration.
Name | Typ | BESCHREIBUNG | Erforderlich/Optional |
---|---|---|---|
Themen | Liste | Dies ist eine Liste der zu beobachtenden Kafka-Themen. | Erforderlich |
connector.class | Schnur | Dies ist der Klassenname der Azure Cosmos DB-Senke. Er sollte auf com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector festgelegt werden. |
Erforderlich |
connect.cosmos.connection.endpoint | URI | Dies ist eine URI-Zeichenfolge für den Azure Cosmos DB-Endpunkt. | Erforderlich |
connect.cosmos.master.key | Schnur | Dies ist der Azure Cosmos DB-Primärschlüssel, mit dem die Senke eine Verbindung herstellt. | Erforderlich |
connect.cosmos.datenbankname | Schnur | Dies ist der Name der Azure Cosmos DB-Datenbank, in die die Senke schreibt. | Erforderlich |
connect.cosmos.containers.topicmap | Schnur | Dies ist eine Zuordnung zwischen Kafka-Themen und Azure Cosmos DB-Containern, die wie gezeigt mit CSV formatiert wird: topic#container,topic2#container2 . |
Erforderlich |
connect.cosmos.connection.gateway.enabled | Boolescher Wert | Flag, um anzugeben, ob der Gatewaymodus verwendet werden soll Der Standardwert ist falsch. | Wahlfrei |
connect.cosmos.sink.bulk.enabled | Boolescher Wert | Flag, um anzugeben, ob der Massenmodus aktiviert ist. Der Standardwert ist TRUE. | Wahlfrei |
connect.cosmos.sink.maxRetryCount | INT | Maximale Anzahl von Wiederholungsversuchen bei vorübergehenden Schreibfehlern. Die Standardeinstellung ist zehnmal. | Wahlfrei |
key.converter | Schnur | Dies ist das Serialisierungsformat für die in das Kafka-Thema geschriebenen Schlüsseldaten. | Erforderlich |
value.converter | Schnur | Dies ist das Serialisierungsformat für die In das Kafka-Thema geschriebenen Wertdaten. | Erforderlich |
key.converter.schemas.enable | Schnur | Legen Sie diese Angabe auf „true“ fest, wenn die Schlüsseldaten ein eingebettetes Schema haben. | Wahlfrei |
value.converter.schemas.enable | Schnur | Legen Sie diese Angabe auf „true“ fest, wenn die Schlüsseldaten ein eingebettetes Schema haben. | Wahlfrei |
tasks.max | INT | Dies ist die maximale Anzahl von Senkenconnectoraufgaben. Die Standardeinstellung ist 1 . |
Wahlfrei |
Daten werden immer als JSON ohne Schema in die Azure Cosmos DB-Instanz geschrieben.
Unterstützte Datentypen
Der Azure Cosmos DB-Senkenconnector konvertiert den Senkendatensatz in ein JSON-Dokument, das die folgenden Schematypen unterstützt:
Schematyp | JSON-Datentyp |
---|---|
Array | Array |
Boolescher Typ (Boolean) | Boolescher Typ (Boolean) |
Float32 | Nummer |
Float64 | Nummer |
Int8 | Nummer |
Int16 | Nummer |
Int32 | Nummer |
Int64 | Nummer |
Karte | Objekt (JSON) |
Schnur | Schnur Null |
Struktur | Objekt (JSON) |
Der Senkenconnector unterstützt auch die folgenden logischen AVRO-Typen:
Schematyp | JSON-Datentyp |
---|---|
Datum | Nummer |
Zeit | Nummer |
Zeitstempel | Nummer |
Hinweis
Die Byte-Deserialisierung wird vom Azure Cosmos DB Sink-Connector derzeit nicht unterstützt.
Single Message Transforms (SMT)
Zusammen mit den Senkenconnectoreinstellungen können Sie die Verwendung von Single Message Transformations (SMTs) zum Ändern von Nachrichten angeben, die die Kafka Connect-Plattform durchlaufen. Weitere Informationen finden Sie in der Confluent-Dokumentation zu SMTs.
Verwenden der SMT „InsertUUID“
Sie können mithilfe der SMT „InsertUUID“ Element-IDs automatisch hinzufügen lassen. Mithilfe der benutzerdefinierten SMT InsertUUID
können Sie das Feld id
mit einem zufälligen UUID-Wert für jede Nachricht einfügen, bevor sie in Azure Cosmos DB geschrieben wird.
Warnung
Verwenden Sie diese SMT nur, wenn Nachrichten das Feld id
nicht enthalten. Andernfalls werden die id
-Werte überschrieben, und Ihre Datenbank enthält möglicherweise doppelte Elemente. Die Verwendung von UUIDs als Nachrichten-ID kann eine schnelle und einfache Methode sein, sie sind aber keine idealen Partitionsschlüssel für die Verwendung in Azure Cosmos DB.
Installieren der SMT
Bevor Sie die SMT InsertUUID
verwenden können, müssen Sie diese Transformation in Ihrem Confluent Platform-Setup installieren. Wenn Sie das Confluent Platform-Setup aus diesem Repository verwenden, ist die Transformation in der Installation bereits enthalten, und Sie können diesen Schritt überspringen.
Alternativ können Sie die InsertUUID-Quelle packen, um eine neue JAR-Datei zu erstellen. Informationen zum manuellen Installieren des Connectors mithilfe der JAR-Datei finden Sie in diesen Anleitungen.
# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid
# package the source code into a JAR file
mvn clean package
# include the following JAR file in Confluent Platform installation
ls target/*.jar
Konfigurieren der SMT
Fügen Sie in Ihrer Senkenconnectorkonfiguration die folgenden Eigenschaften hinzu, um die Eigenschaft id
festzulegen.
"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"
Weitere Informationen zur Verwendung dieser SMT finden Sie im InsertUUID-Repository.
Verwenden von SMTs für die TTL-Konfiguration (Time to live)
Mit den SMTs InsertField
und Cast
können Sie TTL bei jedem in Azure Cosmos DB erstellten Element konfigurieren. Aktivieren Sie TTL für den Container, bevor Sie TTL auf Elementebene aktivieren. Weitere Informationen finden Sie in der TTL-Dokumentation.
Fügen Sie in Ihrer Sink-Connector-Konfiguration die folgenden Eigenschaften hinzu, um den TTL-Wert in Sekunden festzulegen. Im folgenden Beispiel wird der TTL-Wert auf 100 Sekunden festgelegt. Wenn die Nachricht bereits das Feld TTL
enthält, wird der Wert TTL
von diesen SMTs überschrieben.
"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"
Weitere Informationen zur Verwendung dieser SMTs finden Sie in der Dokumentation zu InsertField und Cast.
Behandeln allgemeiner Probleme
Hier finden Sie Lösungen für einige häufige Probleme, die bei der Arbeit mit dem Kafka-Sink-Connector auftreten können.
Lesen von Nicht-JSON-Daten mit „JsonConverter“
Wenn Ihr Quellthema in Kafka Nicht-JSON-Daten enthält und Sie versuchen, diese Daten mithilfe des JsonConverter
zu lesen, wird die folgende Ausnahme angezeigt:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7
Dieser Fehler wird wahrscheinlich dadurch verursacht, dass Daten im Quellthema entweder im Avro-Format oder in einem anderen Format, z. B. einer CSV-Zeichenfolge, serialisiert werden.
Lösung: Wenn die Themendaten im Avro-Format vorliegen, ändern Sie Ihren Kafka Connect Sink-Connector so, dass er wie unten gezeigt AvroConverter
verwendet.
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
Gatewaymodusunterstützung
connect.cosmos.connection.gateway.enabled
ist eine Konfigurationsoption für den Cosmos DB Kafka Sink Connector, der die Datenerfassung mithilfe des Cosmos DB-Gatewaydiensts verbessert. Dieser Dienst fungiert als Front-End für Cosmos DB und bietet Vorteile wie Lastenausgleich, Anforderungsrouting und Protokollübersetzung. Durch die Nutzung des Gatewaydiensts erreicht der Connector einen verbesserten Durchsatz und eine höhere Skalierbarkeit beim Schreiben von Daten in Cosmos DB. Weitere Informationen finden Sie unter den Konnektivitätsmodi.
"connect.cosmos.connection.gateway.enabled": true
Massenmodusunterstützung
Die Eigenschaft connect.cosmos.sink.bulk.enabled
bestimmt, ob das Massenschreibfeature für das Schreiben von Daten aus Kafka-Themen in Azure Cosmos DB aktiviert ist.
Wenn diese Eigenschaft auf true
(standardmäßig) festgelegt ist, wird der Massenschreibmodus aktiviert, sodass Kafka Connect die Massenimport-API von Azure Cosmos DB zum Ausführen effizienter Batchschreibvorgänge mithilfe der Methode CosmosContainer.executeBulkOperations()
verwenden kann. Der Massenschreibmodus verbessert die Schreibleistung erheblich und reduziert die Gesamtlatenz beim Erfassen von Daten in Cosmos DB im Vergleich zum Nicht-Massenvorgangsmodus bei Verwendung der Methode CosmosContainer.upsertItem()
.
Der Massenmodus ist standardmäßig aktiviert. Um die Eigenschaft connect.cosmos.sink.bulk.enabled
zu deaktivieren, müssen Sie sie in der Konfiguration für den Cosmos DB-Senkenconnector auf false
festlegen. Hier sehen Sie ein Beispiel für eine Konfigurationseigenschaftendatei:
"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false
Durch Aktivieren der Eigenschaft connect.cosmos.sink.bulk.enabled
können Sie die Massenschreibfunktion in Kafka Connect für Azure Cosmos DB nutzen, um eine verbesserte Schreibleistung beim Replizieren von Daten aus Kafka-Themen in Azure Cosmos DB zu erzielen.
"connect.cosmos.sink.bulk.enabled": true
Lesen von Nicht-Avro-Daten mit „AvroConverter“
Dieses Szenario trifft zu, wenn Sie versuchen, den Avro-Konverter zum Lesen von Daten aus einem Thema zu verwenden, das nicht im Avro-Format vorliegt. Dabei werden Daten von einem anderen Avro-Serialisierungsmodul als dem Avro-Serialisierungsmodul von „Confluent Schema Registry“ geschrieben, das ein eigenes Sendeformat hat.
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Lösung: Überprüfen Sie das Serialisierungsformat des Quellthemas. Ändern Sie dann den Kafka Connect-Senkenconnector entweder so, dass der richtige Konverter verwendet wird, oder ändern Sie das Upstreamformat in „Avro“.
Lesen einer JSON-Nachricht ohne die erwartete Schema- bzw. Nutzdatenstruktur
Kafka Connect unterstützt eine spezielle Struktur von JSON-Nachrichten, die – wie im Folgenden gezeigt – sowohl die Nutzdaten als auch das Schema enthalten.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "name"
}
]
},
"payload": {
"userid": 123,
"name": "Sam"
}
}
Wenn Sie JSON-Daten zu lesen versuchen, in denen die Daten in dieser Struktur nicht enthalten sind, wird der folgende Fehler angezeigt:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Die einzige für schemas.enable=true
gültige JSON-Struktur enthält also – wie oben gezeigt – Schema- und Nutzdatenfelder als Elemente der obersten Ebene. Wie in der Fehlermeldung beschrieben, sollten Sie die Konfiguration Ihres Connectors wie folgt ändern, wenn Sie nur mit einfachen JSON-Daten arbeiten:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Einschränkungen
- Das automatische Erstellen von Datenbanken und Containern in Azure Cosmos DB wird nicht unterstützt. Die Datenbanken und Container müssen bereits vorhanden und ordnungsgemäß konfiguriert sein.
Nächste Schritte
Weitere Informationen zum Änderungsfeed in Azure Cosmo DB finden Sie in den folgenden Dokumentationen:
Weitere Informationen zu Massenvorgängen im V4 Java SDK finden Sie in der folgenden Dokumentation: