Condividi tramite


Kafka Connect per Azure Cosmos DB - Connettore di sink

Kafka Connect per Azure Cosmos DB è un connettore da cui leggere e scrivere dati in Azure Cosmos DB. Il connettore sink di Azure Cosmos DB consente di esportare dati da argomenti Apache Kafka in un database di Azure Cosmos DB. Il connettore esegue il polling dei dati da Kafka per scrivere nei contenitori del database in base alla sottoscrizione degli argomenti.

Prerequisiti

  • Iniziare con la configurazione della piattaforma Confluent perché offre un ambiente completo da usare. Se non si vuole usare Confluent Platform, è necessario installare e configurare Apache Kafka, Kafka Connect. Sarà anche necessario installare e configurare manualmente i connettori di Azure Cosmos DB.
  • Creare un account Azure Cosmos DB, guida alla configurazione del contenitore
  • Shell Bash, testata in GitHub Codespaces, Mac, Ubuntu, Windows con WSL2. Questa shell non funziona in Cloud Shell o WSL1.
  • Scaricare Java 11+
  • Scaricare Maven

Installare il connettore del lavandino

Se si usa la configurazione consigliata della piattaforma Confluent, il connettore sink di Azure Cosmos DB è incluso nell'installazione ed è possibile ignorare questo passaggio.

In caso contrario, è possibile scaricare il file JAR dall'ultimo rilascio oppure impacchettare questo repository per creare un nuovo file JAR. Per installare manualmente il connettore usando il file JAR, vedere queste istruzioni. È anche possibile creare un pacchetto di un nuovo file JAR dal codice sorgente.

# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar

Creare un argomento Kafka e scrivere dati

Se si usa la piattaforma Confluent, il modo più semplice per creare un argomento Kafka consiste nell'usare l'esperienza utente del Centro di controllo fornita. In caso contrario, è possibile creare manualmente un argomento Kafka usando la sintassi seguente:

./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

Per questo scenario si creerà un argomento Kafka denominato "hotels" e si scriveranno dati JSON non incorporati nello schema nell'argomento. Per creare un argomento all'interno del Centro di controllo, vedere la guida a Confluent.

Avviare quindi il producer della console Kafka per scrivere alcuni record nell'argomento "hotels".

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

Nel producer della console immettere:

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

I tre record immessi vengono pubblicati nell'argomento Kafka "hotels" in formato JSON.

Creare il connettore di sink

Creare un connettore di sink di Azure Cosmos DB in Kafka Connect. Il corpo JSON seguente definisce la configurazione per il connettore sink. Assicurarsi di sostituire i valori per connect.cosmos.connection.endpoint e connect.cosmos.master.key, le proprietà che devono essere salvate dalla guida all'installazione di Azure Cosmos DB nei prerequisiti.

Per ulteriori informazioni su ciascuna di queste proprietà di configurazione, consultare le proprietà del sink.

{
  "name": "cosmosdb-sink-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "tasks.max": "1",
    "topics": [
      "hotels"
    ],
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
    "connect.cosmos.master.key": "<cosmosdbprimarykey>",
    "connect.cosmos.databasename": "kafkaconnect",
    "connect.cosmos.containers.topicmap": "hotels#kafka"
  }
}

Dopo aver compilato tutti i valori, salvare il file JSON da qualche parte in locale. È possibile usare questo file per creare il connettore usando l'API REST.

Creare un connettore usando il Centro di controllo

Un'opzione semplice per creare il connettore consiste nell'scorrere la pagina Web del Centro di controllo. Seguire questa guida all'installazione per creare un connettore dal Centro di controllo. Anziché usare l'opzione DatagenConnector , usare invece il CosmosDBSinkConnector riquadro. Quando si configura il connettore sink, inserire i valori come hai fatto nel file JSON.

In alternativa, nella pagina connettori è possibile caricare il file JSON creato in precedenza usando l'opzione Carica file di configurazione del connettore .

Screenshot dell'opzione

Creare un connettore con l'API REST

Crea il connettore sink utilizzando l'API REST di Connect:

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Confermare i dati scritti in Azure Cosmos DB

Accedere al portale di Azure e passare all'account Azure Cosmos DB. Verifica che nel tuo account vengano creati i tre record del tema "hotels".

Cleanup

Per eliminare il connettore dal Centro di controllo, passare al connettore sink creato e selezionare l'icona Elimina .

Screenshot dell'opzione di eliminazione nella finestra di dialogo del connettore di sink.

In alternativa, usare l'API REST Connect per eliminare:

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

Per eliminare il servizio Azure Cosmos DB creato e il relativo gruppo di risorse usando l'interfaccia della riga di comando di Azure, vedere questi passaggi.

Proprietà di configurazione del sink

Le impostazioni seguenti vengono usate per configurare un connettore sink Kafka di Azure Cosmos DB. Questi valori di configurazione determinano da quali argomenti Kafka vengono consumati i dati, in quale contenitore Azure Cosmos DB vengono scritti i dati e i formati per serializzare i dati. Per un file di configurazione di esempio con i valori predefiniti, fare riferimento a questa configurazione.

Nome TIPO Description Obbligatorio/Facoltativo
Argomenti list Elenco di topic Kafka da monitorare. Obbligatorio
connector.class string Nome della classe del sink di Azure Cosmos DB. Dovrebbe essere impostato su com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector. Obbligatorio
connect.cosmos.connection.endpoint Uri Stringa dell'URI dell'endpoint di Azure Cosmos DB. Obbligatorio
connect.cosmos.master.key string Chiave primaria di Azure Cosmos DB con cui si connette il sink. Obbligatorio
connect.cosmos.databasename string Nome del database di Azure Cosmos DB su cui scrive il sink. Obbligatorio
connect.cosmos.containers.topicmap string Mapping tra gli argomenti Kafka e i contenitori di Azure Cosmos DB, formattati usando CSV come illustrato: topic#container,topic2#container2. Obbligatorio
connect.cosmos.connection.gateway.enabled boolean Flag per indicare se usare la modalità gateway. È "false" per impostazione predefinita. Opzionale
connect.cosmos.sink.bulk.enabled boolean Flag per indicare se la modalità bulk è abilitata. Per impostazione predefinita, è impostato su "true". Opzionale
connect.cosmos.sink.maxRetryCount int Numero massimo di tentativi in caso di errori di scrittura temporanei. Per impostazione predefinita, è 10 volte. Opzionale
convertitore di chiave string Formato di serializzazione per i dati chiave scritti nell'argomento Kafka. Obbligatorio
value.converter string Formato di serializzazione per i dati di valore scritti nell'argomento Kafka. Obbligatorio
key.converter.schemas.enable string Impostare su "true" se i dati della chiave hanno uno schema incorporato. Opzionale
value.converter.schemas.enable string Impostare su "true" se i dati della chiave hanno uno schema incorporato. Opzionale
tasks.max int Numero massimo di attività sink del connettore. L'impostazione predefinita è 1 Opzionale

I dati verranno sempre scritti in Azure Cosmos DB come JSON senza schemi.

Tipi di dati supportati

Il connettore sink di Azure Cosmos DB converte il record sink in un documento JSON che supporta i tipi di schema seguenti:

Tipo di schema Tipo di dati JSON
Array Array
Boolean Boolean
Float32 Number
Float64 Number
Int8 Number
Int16 Number
Int32 Number
Int64 Number
Map Oggetto (JSON)
String String
Null
Struttura Oggetto (JSON)

Il connettore sink supporta anche i tipi logici AVRO seguenti:

Tipo di schema Tipo di dati JSON
Date Number
Time Number
Marca temporale: Number

Annotazioni

La deserializzazione dei byte non è attualmente supportata dal connettore sink di Azure Cosmos DB.

Trasformazioni a messaggio singolo

Oltre alle impostazioni del connettore sink, è possibile specificare l'uso di trasformazioni a messaggio singolo per modificare i messaggi che passano attraverso la piattaforma Kafka Connect. Per altre informazioni, vedere La documentazione di Confluent SMT.

L'uso di InsertUUID SMT

È possibile usare InsertUUID SMT per aggiungere automaticamente gli ID elemento. Con il SMT personalizzato InsertUUID, è possibile inserire il campo id con un valore UUID casuale per ogni messaggio, prima che venga scritto in Azure Cosmos DB.

Avvertimento

Usare questo SMT solo se i messaggi non contengono il campo id. In caso contrario, i id valori verranno sovrascritti ed è possibile che si verifichino elementi duplicati nel database. L'uso di UUID come ID messaggio può essere semplice e rapido, ma non è una chiave di partizione ideale da usare in Azure Cosmos DB.

Installare l'SMT

Prima di poter usare SMT InsertUUID , è necessario installare questa trasformazione nella configurazione di Confluent Platform. Se si usa la configurazione di Confluent Platform da questo repository, la trasformazione è già inclusa nell'installazione ed è possibile ignorare questo passaggio.

In alternativa, è possibile impacchettare l'origine InsertUUID per creare un nuovo file JAR. Per installare manualmente il connettore usando il file JAR, vedere queste istruzioni.

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

Configurare il SMT

All'interno della configurazione del connettore sink, aggiungere le seguenti proprietà per impostare il id.

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

Per ulteriori informazioni sull'utilizzo di questo SMT, consultare il repository InsertUUID.

Uso degli SMT per configurare il tempo di vita (TTL)

Usando sia InsertField che Cast SMT, è possibile configurare la durata TTL per ogni elemento creato in Azure Cosmos DB. Abilitare TTL sul contenitore prima di abilitare TTL per ogni elemento. Per ulteriori informazioni, vedere la documentazione time-to-live.

All'interno della configurazione del connettore di Sink, aggiungere le proprietà seguenti per impostare la durata (TTL) in secondi. In questo esempio, la durata (TTL) è impostata su 100 secondi. Se il messaggio contiene già il campo TTL, il valore TTL verrà sovrascritto da questi SMT.

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

Per ulteriori informazioni sull'uso di questi SMT, consultare la documentazione InsertField e Cast .

Risoluzione dei problemi comuni

Ecco le soluzioni ad alcuni problemi comuni che possono verificarsi quando si usa il connettore sink Kafka.

Leggere dati non JSON con JsonConverter

Se sono presenti dati non JSON nell'argomento di origine in Kafka e si tenta di leggerli usando JsonConverter, verrà visualizzata l'eccezione seguente:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7

Questo errore è probabilmente causato dai dati nell'argomento di origine serializzati in Avro o in un altro formato, ad esempio una stringa CSV.

Soluzione: Se i dati del topic sono in formato AVRO, modificare il connettore di destinazione di Kafka Connect per utilizzare il AvroConverter come illustrato di seguito.

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

Supporto della modalità gateway

connect.cosmos.connection.gateway.enabled è un'opzione di configurazione per il connettore sink Kafka di Cosmos DB che migliora l'inserimento dei dati usando il servizio gateway Cosmos DB. Questo servizio funge da front-end per Cosmos DB, offrendo vantaggi come il bilanciamento del carico, il routing delle richieste e la conversione del protocollo. Sfruttando il servizio gateway, il connettore ottiene una maggiore velocità effettiva e scalabilità durante la scrittura di dati in Cosmos DB. Per altre informazioni, vedere Modalità di connettività.

"connect.cosmos.connection.gateway.enabled": true

Supporto per la modalità bulk

connect.cosmos.sink.bulk.enabled la proprietà determina se la funzionalità di scrittura massiva è abilitata per la scrittura di dati dai topic Kafka in Azure Cosmos DB.

Quando questa proprietà è impostata su true (per impostazione predefinita), abilita la modalità di scrittura bulk, consentendo a Kafka Connect di usare l'API di importazione bulk di Azure Cosmos DB per eseguire operazioni di scrittura batch efficienti usando CosmosContainer.executeBulkOperations() il metodo . La modalità di scrittura bulk migliora significativamente le prestazioni di scrittura e riduce la latenza complessiva durante l'inserimento di dati in Cosmos DB, rispetto alla modalità non bulk, quando si utilizza il metodo CosmosContainer.upsertItem().

La modalità bulk è abilitata per impostazione predefinita. Per disabilitare la connect.cosmos.sink.bulk.enabled proprietà, è necessario impostarla false su nella configurazione per il connettore sink di Cosmos DB. Ecco un file di proprietà di configurazione di esempio:

"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false

Abilitando la connect.cosmos.sink.bulk.enabled proprietà, è possibile sfruttare la funzionalità di scrittura in blocco in Kafka Connect per Azure Cosmos DB per ottenere prestazioni di scrittura migliorate durante la replica dei dati dagli argomenti Kafka ad Azure Cosmos DB.

"connect.cosmos.sink.bulk.enabled": true

Leggere i dati non Avro con AvroConverter

Questo scenario è applicabile quando si tenta di usare il convertitore Avro per leggere i dati da un argomento che non è in formato Avro. Questo include i dati scritti da un serializzatore Avro diverso dal serializzatore Avro del Registro Schemi Confluent, che ha un proprio formato di trasmissione.

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Soluzione: controllare il formato di serializzazione dell'argomento di origine. Passare quindi al connettore di sink di Kafka Connect per usare il convertitore destro o impostare il formato upstream su Avro.

Leggere un messaggio JSON senza la struttura di schema/payload prevista

Kafka Connect supporta una struttura speciale di messaggi JSON contenenti sia payload che schema come indicato di seguito.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

Se si tenta di leggere i dati JSON che non contengono i dati in questa struttura, verrà visualizzato l'errore seguente:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Per essere chiari, l'unica struttura JSON valida per schemas.enable=true include campi di schema e payload come elementi di primo livello, come illustrato in precedenza. Come indica il messaggio di errore, se si hanno solo dati JSON semplici, è necessario modificare la configurazione del connettore in:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

Limitazioni

  • La creazione automatica di database e contenitori in Azure Cosmos DB non è supportata. Il database e i contenitori devono essere già esistenti e devono essere configurati correttamente.

Passaggi successivi

Per altre informazioni sul feed di modifiche in Azure Cosmo DB, vedere la documentazione seguente:

Per altre informazioni sulle operazioni bulk in Java SDK V4, vedere la documentazione seguente: