Condividi tramite


Kafka Connect per Azure Cosmos DB - V2

Kafka Connect è uno strumento per lo streaming scalabile e affidabile dei dati tra Apache Kafka e altri sistemi. Con Kafka Connect è possibile definire connettori che spostano set di dati di grandi dimensioni all'esterno e all'esterno di Kafka. Kafka Connect per Azure Cosmos DB è un connettore da cui leggere e scrivere dati in Azure Cosmos DB.

Semantica dei connettori di sorgente e di destinazione

  • Connettore di origine - Attualmente questo connettore supporta esattamente una volta.

  • Connettore sink: questo connettore supporta completamente la semantica una sola volta.

Versione di Kafka supportata

3.6.0+

Formati di dati supportati

I connettori di origine e sink possono essere configurati per supportare i formati di dati seguenti:

Formato Description
JSON semplice Struttura di record JSON senza uno schema collegato.
JSON con schema Struttura di record JSON con informazioni esplicite sullo schema per garantire che i dati corrispondano al formato previsto.
AVRO Una chiamata di procedura remota orientata alle righe e un framework di serializzazione dei dati sviluppati all'interno del progetto Hadoop di Apache. Usa JSON per definire tipi di dati, protocolli e serializza i dati in un formato binario compatto.

Le impostazioni chiave e valore, incluso il formato e la serializzazione, possono essere configurate in modo indipendente in Kafka. È quindi possibile usare formati di dati diversi per chiavi e valori, rispettivamente. Per soddisfare formati di dati diversi, è disponibile la configurazione del convertitore sia per key.converter che per value.converter.

Esempi di configurazione del convertitore

JSON semplice

Se è necessario usare JSON senza registro schemi per i dati di connessione, usare il JsonConverter file supportato con Kafka. L'esempio seguente mostra le JsonConverter proprietà chiave e valore aggiunte alla configurazione:

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

JSON con schema

Impostare le proprietà key.converter.schemas.enable e value.converter.schemas.enable su true in modo che la chiave o il valore vengano considerati come un oggetto JSON composito che contiene sia uno schema interno che i dati. Senza queste proprietà, la chiave o il valore viene considerato come JSON normale.

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Il messaggio risultante a Kafka sarà simile all'esempio seguente, con schema e payload come elementi di primo livello nel codice JSON:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "userid": 123,
    "name": "user's name"
  }
}

Annotazioni

Il messaggio scritto in Azure Cosmos DB è costituito dallo schema e dal payload. Si notino le dimensioni del messaggio, nonché la proporzione di tale messaggio costituita dal payload rispetto allo schema. Lo schema viene ripetuto in ogni messaggio scritto a Kafka. In scenari come questo, è possibile usare un formato di serializzazione come JSON Schema o AVRO, in cui lo schema viene archiviato separatamente e il messaggio contiene solo il payload.

AVRO

Il connettore Kafka supporta il formato dati AVRO. Per usare il formato AVRO, configurare un oggetto AvroConverter in modo che il connettore sappia come usare i dati AVRO. Azure Cosmos DB Kafka Connect viene testato con AvroConverter fornito da Confluent, con licenza Apache 2.0. Se si preferisce, è anche possibile usare un convertitore personalizzato diverso.

Kafka gestisce chiavi e valori in modo indipendente. Specificare le proprietà key.converter e value.converter come richiesto nella configurazione del worker. Quando si usa AvroConverter, aggiungere una proprietà di convertitore aggiuntiva che fornisce l'URL per il Registro di sistema dello schema. L'esempio seguente mostra le proprietà di chiave e valore AvroConverter aggiunte alla configurazione.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

Scegliere un formato di conversione

Di seguito sono riportate alcune considerazioni su come scegliere un formato di conversione:

  • Quando si configura un connettore di origine:

    • Se si vuole che Kafka Connect includa json normale nel messaggio che scrive in Kafka, impostare la configurazione PLAIN JSON .

    • Se si vuole che Kafka Connect includa lo schema nel messaggio che scrive in Kafka, impostare JSON con la configurazione dello schema .

    • Se si vuole che Kafka Connect includa il formato AVRO nel messaggio scritto in Kafka, impostare la configurazione AVRO .

  • Se si usano dati JSON da un topic Kafka in un sink connector, comprendere come i dati JSON sono stati serializzati quando sono stati scritti nel topic Kafka.

    • Se è stato scritto con serializzatore JSON, impostare Kafka Connect per usare il convertitore (org.apache.kafka.connect.json.JsonConverter)JSON.

      • Se i dati JSON sono stati scritti come stringa normale, determinare se i dati includono uno schema o un payload annidato. In caso affermativo, impostare la configurazione JSON con schema.
      • Tuttavia, se si utilizzano dati JSON e questi non hanno uno schema o il costrutto del payload, è necessario indicare al connettore di non cercare uno schema impostando schemas.enable=false secondo la configurazione Plain JSON.
    • Se è stato scritto con il serializzatore AVRO, impostare Kafka Connect per usare il convertitore (io.confluent.connect.avro.AvroConverter) AVRO in base alla configurazione AVRO .

Configurazione

Proprietà di configurazione comuni

I connettori di origine e sink condividono le proprietà di configurazione comuni seguenti:

Nome della proprietà di Config Impostazione predefinita Description
azure.cosmos.account.endpoint None URI dell'endpoint dell'account Cosmos DB
azure.cosmos.account.environment Azure Ambiente Azure dell'account Cosmos DB: Azure, AzureChina, AzureUsGovernment, AzureGermany.
azure.cosmos.account.tenantId "" ID tenant dell'account Cosmos DB. Obbligatorio per l'autenticazione ServicePrincipal.
azure.cosmos.auth.type MasterKey Sono attualmente supportati due tipi di autenticazione: MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.account.key "" Chiave dell'account Cosmos DB (richiesta solo se auth.type è MasterKey)
azure.cosmos.auth.aad.clientId "" Id client/Id applicazione dell'entità servizio. Obbligatorio per l'autenticazione ServicePrincipal.
azure.cosmos.auth.aad.clientSecret "" Segreto client/password del principale del servizio.
azure.cosmos.mode.gateway false Flag per indicare se usare la modalità gateway. Per impostazione predefinita è false, significa che l'SDK usa la modalità diretta.
azure.cosmos.preferredRegionList [] Elenco di aree preferite da usare per un account Cosmos DB in più aree. Si tratta di un valore delimitato da virgole (ad esempio, [East US, West US] o East US, West US) nelle aree preferite da usare come hint. È consigliabile usare un cluster Kafka collocato con l'account Cosmos DB e impostare l'area del cluster Kafka come area preferita.
azure.cosmos.application.name "" Nome dell'applicazione. Viene aggiunto come suffisso per userAgent.
azure.cosmos.throughputControl.enabled false Flag per indicare se il controllo del throughput è abilitato.
azure.cosmos.throughputControl.account.endpoint "" Uri dell'endpoint dell'account di controllo del throughput di Cosmos DB.
azure.cosmos.throughputControl.account.environment Azure Ambiente Azure dell'account Cosmos DB: Azure, AzureChina, AzureUsGovernment, AzureGermany.
azure.cosmos.throughputControl.account.tenantId "" ID tenant dell'account Cosmos DB. Obbligatorio per l'autenticazione ServicePrincipal.
azure.cosmos.throughputControl.auth.type MasterKey Sono attualmente supportati due tipi di autenticazione: MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.throughputControl.account.key "" Chiave dell'account per il controllo della gestione della capacità di Cosmos DB (obbligatoria solo se throughputControl.auth.type è MasterKey).
azure.cosmos.throughputControl.auth.aad.clientId "" Id client/Id applicazione dell'entità servizio. Obbligatorio per l'autenticazione ServicePrincipal.
azure.cosmos.throughputControl.auth.aad.clientSecret "" Segreto client/password del principale del servizio.
azure.cosmos.throughputControl.preferredRegionList [] Elenco di aree preferite da usare per un account Cosmos DB in più aree. Si tratta di un valore delimitato da virgole (ad esempio, [East US, West US] o East US, West US) nelle aree preferite da usare come hint. È consigliabile usare un cluster Kafka collocato con l'account Cosmos DB e impostare l'area del cluster Kafka come area preferita.
azure.cosmos.throughputControl.mode.gateway false Flag per indicare se usare la modalità gateway. Per impostazione predefinita è false, significa che l'SDK usa la modalità diretta.
azure.cosmos.throughputControl.group.name "" Nome del gruppo di controllo del throughput. Poiché il cliente può creare molti gruppi per un contenitore, il nome deve essere univoco.
azure.cosmos.throughputControl.targetThroughput -1 Obiettivo del gruppo di controllo del throughput. Il valore deve essere maggiore di 0.
azure.cosmos.throughputControl.targetThroughputThreshold -1 Soglia di throughput del gruppo di controllo della larghezza di banda di destinazione. Il valore deve essere compreso tra (0,1].
azure.cosmos.throughputControl.priorityLevel None Livello di priorità del gruppo di controllo della portata. Il valore può essere None, High o Low.
azure.cosmos.throughputControl.globalControl.database.name "" Database usato per il controllo globale del flusso dati.
azure.cosmos.throughputControl.globalControl.container.name "" Contenitore usato per il controllo globale della velocità effettiva.
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 Questo controlla la frequenza con cui il client aggiornerà l'utilizzo della velocità effettiva di se stesso e regola la propria condivisione di velocità effettiva in base all'utilizzo della velocità effettiva di altri client. Il valore predefinito è 5 s, il valore minimo consentito è 5 s.
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 Questo controlla la rapidità con cui rileviamo che il client è offline e consente quindi agli altri client di prendere la sua quota di larghezza di banda. Il valore predefinito è 11 s, il valore minimo consentito è 2 * renewIntervalInMS + 1.

Per la configurazione specifica del connettore sink, vedere la documentazione del connettore sink

Per la configurazione specifica del connettore di origine, vedere la documentazione del connettore di origine

Errori comuni di configurazione

Se i convertitori vengono configurati in modo errato in Kafka Connect, possono verificarsi errori. Questi errori vengono visualizzati nel connettore perché si tenta di deserializzare i messaggi già archiviati in Kafka. I problemi del convertitore non si verificano in genere nell'origine perché la serializzazione è impostata nell'origine.

Per altre informazioni, vedere la documentazione sugli errori di configurazione comuni .

Configurazione del progetto

Per istruzioni di configurazione iniziali, vedere l'installazione per sviluppatori .

Risorse

Passaggi successivi