Condividi tramite


Kafka Connect per Azure Cosmos DB - connettore sink

SI APPLICA A: NoSQL

Kafka Connect per Azure Cosmos DB è un connettore da cui leggere e scrivere dati in Azure Cosmos DB. Il connettore sink di Azure Cosmos DB consente di esportare dati da argomenti di Apache Kafka in un database di Azure Cosmos DB. Il connettore esegue il polling dei dati da Kafka per scrivere nei contenitori del database in base alla sottoscrizione degli argomenti.

Prerequisiti

  • Iniziare con la configurazione della piattaforma Confluent poiché offre un ambiente completo da usare. Se non si desidera usare la piattaforma Confluent, è necessario installare e configurare in autonomia Apache Kafka e Kafka Connect. Sarà inoltre necessario installare e configurare manualmente i connettori di Azure Cosmos DB.
  • Creare un account Azure Cosmos DB, guida alla configurazione di contenitori
  • Shell Bash, testata in GitHub Codespaces, Mac, Ubuntu, Windows con WSL2. Questa shell non funziona in Cloud Shell o WSL1.
  • Scaricare Java versione 11 o successiva
  • Scaricare Maven

Installare il connettore sink

Se si sta utilizzando la configurazione consigliata della piattaforma Confluent, il connettore sink di Azure Cosmos DB è incluso nell'installazione ed è possibile ignorare questo passaggio.

In caso contrario, è possibile scaricare il file JAR dalla versione o dal pacchetto più recente del repository per creare un nuovo file JAR. Per installare manualmente il connettore usando il file JAR, consultare queste istruzioni. È anche possibile creare un pacchetto di un nuovo file JAR dal codice sorgente.

# clone the kafka-connect-cosmosdb repo if you haven't done so already
git clone https://github.com/microsoft/kafka-connect-cosmosdb.git
cd kafka-connect-cosmosdb

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Kafka Connect installation
ls target/*dependencies.jar

Creare un argomento Kafka e scrivere dati

Se si usa la piattaforma Confluent, il modo più semplice per creare un argomento Kafka consiste nell'utilizzare l'esperienza utente del Centro di controllo fornita. In alternativa, è possibile creare manualmente un argomento Kafka usando la sintassi seguente:

./kafka-topics.sh --create --boostrap-server <URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

Per questo scenario, si creerà un argomento Kafka denominato "hotel" e nell'argomento si scriveranno dati JSON non incorporati nello schema. Per creare un argomento all'interno del Centro di controllo, consultare la guida alla piattaforma Confluent.

Successivamente, avviare il producer della console Kafka per scrivere alcuni record nell'argomento "hotel".

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

Nel producer della console immettere:

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

I tre record immessi vengono pubblicati nell'argomento Kafka intitolato "hotel" in formato JSON.

Creare il connettore sink

Creare un connettore sink di Azure Cosmos DB in Kafka Connect. Il corpo JSON seguente definisce la configurazione per il connettore sink. Assicurarsi di sostituire i valori per connect.cosmos.connection.endpoint e connect.cosmos.master.key, proprietà che è necessario salvare dalla guida alla configurazione di Azure Cosmos DB nei prerequisiti.

Per altre informazioni su ognuna di queste proprietà di configurazione, consultare Proprietà sink.

{
  "name": "cosmosdb-sink-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "tasks.max": "1",
    "topics": [
      "hotels"
    ],
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
    "connect.cosmos.master.key": "<cosmosdbprimarykey>",
    "connect.cosmos.databasename": "kafkaconnect",
    "connect.cosmos.containers.topicmap": "hotels#kafka"
  }
}

Dopo aver compilato tutti i valori, salvare il file JSON in locale. È possibile usare questo file per creare il connettore usando l'API REST.

Creare il connettore usando Control Center

Un'opzione semplice per creare il connettore consiste nel consultare la pagina Web del Centro di controllo. Seguire questa guida all'installazione per creare un connettore dal Centro di controllo. Anziché usare l'opzione DatagenConnector, usare il riquadro CosmosDBSinkConnector. Quando si configura il connettore sink, inserire i valori che sono stati compilati nel file JSON.

In alternativa, nella pagina connettori è possibile caricare il file JSON creato in precedenza usando l'opzione Carica file di configurazione del connettore.

Screenshot dell'opzione

Creare il connettore usando l'API REST

Creare il connettore sink usando l'API REST Connect:

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Confermare i dati scritti in Azure Cosmos DB

Accedere al portale di Azure e passare all'account Azure Cosmos DB. Verificare che nell'account siano stati creati i tre record dell'argomento "hotel".

Pulizia

Per eliminare il connettore dal Centro di controllo, passare al connettore sink creato e selezionare l'icona Elimina.

Screenshot dell'opzione di eliminazione nella finestra di dialogo del connettore sink.

In alternativa, usare l'API REST Connect per eliminare il connettore:

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

Per eliminare il servizio Azure Cosmos DB creato e il relativo gruppo di risorse usando l'interfaccia della riga di comando di Azure, seguire questi passaggi.

Proprietà di configurazione del connettore sink

Le impostazioni seguenti servono per configurare un connettore sink Kafka di Azure Cosmos DB. Questi valori di configurazione determinano quali dati degli argomenti Kafka vengono utilizzati, in quali dati del contenitore di Azure Cosmos DB vengono scritti e i formati per serializzare i dati. Per ottenere un file di configurazione di esempio con i valori predefiniti, vedere questa configurazione.

Nome Tipo Descrizione Obbligatorio/facoltativo
Argomenti list Un elenco di argomenti Kafka da consultare. Richiesto
connector.class string Nome della classe del sink di Azure Cosmos DB. Dovrebbe essere impostato su com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector. Richiesto
connect.cosmos.connection.endpoint uri Stringa URI dell'endpoint di Azure Cosmos DB. Richiesto
connect.cosmos.master.key string Chiave primaria di Azure Cosmos DB con cui si connette il sink. Richiesto
connect.cosmos.databasename string Nome dell'account del database Azure Cosmos DB in cui scrive il sink. Richiesto
connect.cosmos.containers.topicmap string Mapping tra argomenti Kafka e contenitori di Azure Cosmos DB, formattati utilizzando un file CSV come illustrato: topic#container,topic2#container2. Richiesto
connect.cosmos.connection.gateway.enabled boolean Flag per indicare se usare la modalità gateway. Per impostazione predefinita, il valore è impostato su "false". Facoltativo
connect.cosmos.sink.bulk.enabled boolean Flag per indicare se la modalità bulk è abilitata. Per impostazione predefinita, il valore è impostato su "true". Facoltativo
connect.cosmos.sink.maxRetryCount int Numero massimo di tentativi in caso di errori di scrittura temporanei. Per impostazione predefinita, sono concessi 10 tentativi. Facoltativo
key.converter string Formato di serializzazione per i dati della chiave scritti nell'argomento Kafka. Richiesto
value.converter string Formato di serializzazione per i dati del valore scritti nell'argomento Kafka. Richiesto
key.converter.schemas.enable string Impostare su "true" se i dati della chiave hanno uno schema incorporato. Facoltativo
value.converter.schemas.enable string Impostare su "true" se i dati della chiave hanno uno schema incorporato. Facoltativo
tasks.max int Numero massimo di attività del connettore sink. L'impostazione predefinita è 1 Facoltativo

I dati verranno sempre scritti in Azure Cosmos DB come file JSON senza schemi.

Tipi di dati supportati

Il connettore sink di Azure Cosmos DB converte il record sink in un documento JSON che supporta i seguenti tipi di schema:

Tipo di schema Tipo di dati JSON
Matrice Matrice
Booleano Booleano
Float32 Numero
Float64 Numero
Int8 Numero
Int16 Numero
Int32 Numero
Int64 Numero
Mapping Oggetto (JSON)
String String
Null
Struct Oggetto (JSON)

Il connettore sink supporta anche i seguenti tipi logici in formato AVRO:

Tipo di schema Tipo di dati JSON
Data Numero
Time Numero
Timestamp: Numero

Nota

La deserializzazione dei byte non è attualmente supportata dal connettore sink di Azure Cosmos DB.

Trasformazioni a messaggio singolo (SMT)

Oltre alle impostazioni del connettore sink, è possibile specificare l'uso di trasformazioni a messaggio singolo (SMT) per modificare i messaggi che passano attraverso la piattaforma Kafka Connect. Per ottenere maggiori informazioni, consultare la documentazione di Confluent SMT.

Uso della funzionalità SMT InsertUUID

È possibile usare la funzionalità SMT InsertUUID per aggiungere automaticamente ID elemento. Con la funzionalità SMT InsertUUID personalizzata, è possibile inserire il campo id con un valore UUID casuale per ogni messaggio, prima che venga scritto in Azure Cosmos DB.

Avviso

Utilizzare questa funzionalità SMT solo se i messaggi non contengono il campo id. In alternativa, i valori id verranno sovrascritti ed è possibile trovare elementi duplicati nel database. L'uso di UUID come ID messaggio può essere semplice e rapido, ma non è una chiave di partizione ideale da usare in Azure Cosmos DB.

Installare la funzionalità SMT

Prima di poter usare la funzionalità SMT InsertUUID, è necessario installare questa trasformazione nella configurazione della piattaforma Confluent. Se si usa la configurazione della piattaforma Confluent da questo repository, la trasformazione è già inclusa nell'installazione ed è possibile ignorare questo passaggio.

In alternativa, è possibile creare un pacchetto dell'origine InsertUUID per creare un nuovo file JAR. Per installare manualmente il connettore usando il file JAR, consultare queste istruzioni.

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

Configurare la funzionalità SMT

All'interno della configurazione del connettore sink, aggiungere le proprietà seguenti per impostare il valore id.

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

Per altre informazioni sull'uso di questa funzionalità SMT, consultare il repository InsertUUID.

Utilizzare la funzionalità SMT per configurare la durata (TTL)

Usando le funzionalità SMT InsertField e Cast, è possibile configurare la durata (TTL) per ogni elemento creato in Azure Cosmos DB. Abilitare la durata (TTL) nel contenitore prima di abilitare la durata a livello di elemento. Per ottenere ulteriori informazioni, consultare la documentazione relativa alla durata (TTL).

All'interno della configurazione del connettore sink, aggiungere le proprietà seguenti per impostare la durata in secondi. Nell'esempio riportato di seguito, la durata (TTL) è impostata su 100 secondi. Se il messaggio contiene già il campo TTL, il valore TTL verrà sovrascritto da tali funzionalità SMT.

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

Per altre informazioni sull'uso di queste funzionalità SMT, consultare la documentazione InsertField e Cast.

Risoluzione dei problemi comuni

Ecco le soluzioni ad alcuni problemi comuni che possono verificarsi quando si usa il connettore sink Kafka.

Lettura di dati non JSON con JsonConverter

Se si dispone di dati non JSON nell'argomento di origine in Kafka e si tenta di leggerli usando JsonConverter, si visualizzerà la seguente eccezione:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
...
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7

Questo errore è probabilmente causato dai dati all'interno dell'argomento di origine serializzati in Avro o in un altro formato, ad esempio una stringa CSV.

Soluzione: se i dati dell'argomento sono in formato AVRO, modificare il connettore sink Kafka Connect per usare il AvroConverter come illustrato di seguito.

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

Supporto della modalità Gateway

connect.cosmos.connection.gateway.enabled è un'opzione di configurazione per il connettore sink Kafka di Cosmos DB che migliora l'inserimento dei dati utilizzando il servizio gateway di Cosmos DB. Questo servizio funge da front-end per Cosmos DB, offrendo vantaggi come il bilanciamento del carico, il routing delle richieste e la conversione di protocollo. Sfruttando il servizio gateway, il connettore ottiene velocità effettiva e scalabilità migliorate durante la scrittura di dati in Cosmos DB. Per ottenere maggiori informazioni, consultare Modalità di connettività.

"connect.cosmos.connection.gateway.enabled": true

Supporto della modalità in bulk

La proprietà connect.cosmos.sink.bulk.enabled determina se la funzionalità di scrittura bulk è abilitata per la scrittura di dati da argomenti Kafka in Azure Cosmos DB.

Quando questa proprietà è impostata su true (impostazione predefinita), abilita la modalità di scrittura bulk, consentendo a Kafka Connect di usare l'API di importazione in blocco di Azure Cosmos DB per eseguire scritture batch efficienti utilizzando il metodo CosmosContainer.executeBulkOperations(). La modalità di scrittura bulk migliora significativamente le prestazioni di scrittura e riduce la latenza complessiva durante l'inserimento di dati in Cosmos DB rispetto alla modalità non bulk quando viene utilizzato il metodo CosmosContainer.upsertItem().

La modalità bulk è abilitata per impostazione predefinita. Per disabilitare la proprietà connect.cosmos.sink.bulk.enabled, è necessario impostarla su false nella configurazione per il connettore sink di Cosmos DB. Ecco un esempio di file di proprietà di configurazione:

"name": "my-cosmosdb-connector",
"connector.class": "io.confluent.connect.azure.cosmosdb.CosmosDBSinkConnector",
"tasks.max": 1,
"topics": "my-topic"
"connect.cosmos.endpoint": "https://<cosmosdb-account>.documents.azure.com:443/"
"connect.cosmos.master.key": "<cosmosdb-master-key>"
"connect.cosmos.database": "my-database"
"connect.cosmos.collection": "my-collection"
"connect.cosmos.sink.bulk.enabled": false

Abilitando la proprietà connect.cosmos.sink.bulk.enabled, è possibile sfruttare la funzionalità di scrittura bulk in Kafka Connect per Azure Cosmos DB per ottenere prestazioni di scrittura migliorate durante la replicazione di dati dagli argomenti Kafka ad Azure Cosmos DB.

"connect.cosmos.sink.bulk.enabled": true

Lettura di dati non in formato Avro con AvroConverter

Questo scenario è applicabile quando si tenta di usare il convertitore Avro per leggere i dati provenienti da un argomento non in formato Avro. Tale scenario include i dati scritti da un serializzatore Avro diverso dal serializzatore Avro del Registro schemi Confluent, il quale ha un proprio formato di collegamento.

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
...
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Soluzione: controllare il formato di serializzazione dell'argomento d'origine. Passare quindi al connettore sink di Kafka Connect per usare il convertitore corretto o modificare il formato upstream in Avro.

Leggere un messaggio JSON senza la struttura di schema/payload prevista

Kafka Connect supporta una struttura speciale di messaggi JSON contenenti sia payload che schemi come indicato di seguito.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

Se si tenta di leggere dati JSON che non contengono dati in questa struttura, si visualizzerà il seguente errore:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Per fare chiarezza, l'unica struttura JSON valida per la funzione schemas.enable=true include campi di schema e di payload come elementi di primo livello, come illustrato in precedenza. Come indicato dal messaggio di errore, se si hanno solo dati JSON semplici, è necessario modificare la configurazione del connettore come illustrato di seguito:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

Limiti

  • La creazione automatica di database e contenitori in Azure Cosmos DB non è supportata. Il database e i contenitori devono essere già esistenti nonché configurati correttamente.

Passaggi successivi

Per altre informazioni sul feed di modifiche in Azure Cosmo DB con la documentazione seguente:

È possibile ottenere ulteriori informazioni sulle operazioni bulk in V4 Java SDK consultando la documentazione seguente: