Options

Questa pagina descrive le opzioni di configurazione per la lettura e la scrittura in Apache Kafka usando Structured Streaming in Azure Databricks.

Il connettore Kafka di Azure Databricks si basa sul connettore Apache Spark Kafka e supporta tutte le opzioni di configurazione Kafka standard. Qualsiasi opzione preceduta da kafka. viene passata direttamente al client Kafka sottostante. Ad esempio, .option("kafka.max.poll.records", "500") imposta la proprietà del max.poll.records consumer Kafka. Vedere la documentazione sulla configurazione di Kafka per l'elenco completo delle proprietà Kafka disponibili.

Per un elenco completo delle opzioni di origine e sink structured streaming, vedere Kafka e Structured Streaming + Kafka Integration Guide (Guida all'integrazione di Structured Streaming + Kafka).

Opzioni obbligatorie

Per informazioni dettagliate sulle opzioni necessarie, vedere Kafka.

Per la lettura e la scrittura è necessaria l'opzione seguente:

Chiave Descrizione
kafka.bootstrap.servers Elenco delimitato da virgole di indirizzi host:port per i broker Kafka. Imposta la proprietà del bootstrap.servers client Kafka.
Se non sono presenti dati di Kafka, controllare l'elenco di indirizzi del broker per gli indirizzi non corretti. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. I client Kafka presuppongono che i broker saranno disponibili alla fine e riprovare per sempre quando ricevono errori di rete.

Per le letture kafka, è necessario specificare esattamente una delle opzioni seguenti per identificare gli argomenti da utilizzare:

  • subscribe
  • subscribePattern
  • assign

Quando si scrive in Kafka, è possibile impostare facoltativamente l'opzione topic per specificare un argomento di destinazione per tutte le righe. Se non è impostato, il dataframe deve includere una topic colonna.

Opzioni comuni per il lettore

Le opzioni seguenti vengono comunemente usate durante la lettura da Kafka:

Chiave Descrizione
minPartitions Numero minimo di partizioni da leggere da Kafka.
maxRecordsPerPartition Numero massimo di record per partizione Spark.
failOnDataLoss Indica se la query non riesce quando è possibile che i dati siano andato persi.
maxOffsetsPerTrigger Numero massimo di offset elaborati per intervallo di trigger.
startingOffsets Offset da cui la query inizia la lettura.
endingOffsets Dove interrompere la lettura per le query batch.
groupIdPrefix Prefisso personalizzato per l'ID gruppo di consumer generato automaticamente.
kafka.group.id ID gruppo da usare durante la lettura da Kafka.
Usare questo comportamento con cautela perché può causare un comportamento imprevisto. Per impostazione predefinita, ogni query genera un ID di gruppo univoco per la lettura dei dati. In questo modo ogni query ha un proprio gruppo di consumer che evita interferenze da altri consumer e consente a ogni query di leggere tutte le partizioni degli argomenti sottoscritti. In alcuni scenari, ad esempio l'autorizzazione basata su gruppo Kafka, è possibile usare ID gruppo autorizzati specifici per leggere i dati.
Le query con lo stesso ID gruppo potrebbero interferire tra loro e leggere solo dati parziali. L'interferenza può verificarsi quando si eseguono carichi di lavoro batch e di streaming simultanei o quando si avviano e riavviano le query in rapida successione.
Per ridurre al minimo i problemi, impostare la configurazione session.timeout.ms del consumer Kafka in modo che sia molto piccola.
includeHeaders Indica se includere le intestazioni del messaggio Kafka nell'output.
bytesEstimateWindowLength Intervallo di tempo usato per stimare i byte rimanenti tramite la estimatedTotalBytesBehindLatest metrica.

Per un elenco completo delle opzioni di origine e sink structured streaming, vedere Kafka e Structured Streaming + Kafka Integration Guide (Guida all'integrazione di Structured Streaming + Kafka).

Opzioni comuni del writer

Le opzioni seguenti vengono comunemente usate durante la scrittura in Kafka:

Chiave Descrizione
topic Imposta l'argomento per tutte le righe. Questa operazione ha la precedenza su qualsiasi topic colonna nei dati.
includeHeaders Indica se includere le intestazioni Kafka nella riga.

Importante

Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se il sink Kafka usa la versione 2.8.0 o successiva con ACL configurati ma senza IDEMPOTENT_WRITE abilitato, le scritture avranno esito negativo. Risolvere questo problema eseguendo l'aggiornamento a Kafka 2.8.0 o versione successiva oppure impostando .option("kafka.enable.idempotence", "false").

Per un elenco completo delle opzioni di origine e sink structured streaming, vedere Kafka e Structured Streaming + Kafka Integration Guide (Guida all'integrazione di Structured Streaming + Kafka).

Opzioni di autenticazione

Azure Databricks supporta più metodi di autenticazione per Kafka, tra cui le credenziali del servizio Unity Catalog, SASL/SSL e le opzioni specifiche del cloud per AWS MSK, Hub eventi di Azure e Google Cloud Managed Kafka.

Azure Databricks consiglia di usare le credenziali del servizio Catalogo Unity per l'autenticazione ai servizi Kafka gestiti dal cloud:

Opzione Descrizione
databricks.serviceCredential Nome di una credenziale del servizio Catalogo Unity per l'autenticazione ai servizi Kafka gestiti dal cloud (AWS MSK, Hub eventi di Azure o Google Cloud Managed Kafka). Disponibile in Databricks Runtime 16.1 e versioni successive.
databricks.serviceCredential.scope Ambito OAuth per le credenziali del servizio. Impostare questa impostazione solo se Azure Databricks non può dedurre automaticamente l'ambito per il servizio Kafka.

Quando si usa una credenziale del servizio Catalogo Unity, non è necessario specificare opzioni SASL/SSL come kafka.sasl.mechanism, kafka.sasl.jaas.configo kafka.security.protocol.

Le opzioni SASL/SSL comuni includono:

Opzione Descrizione
kafka.security.protocol Protocollo usato per comunicare con i broker, SASL_SSLad esempio , SSL, PLAINTEXT.
kafka.sasl.mechanism Meccanismo SASL (ad esempio, PLAIN, SCRAM-SHA-256SCRAM-SHA-512, OAUTHBEARER). AWS_MSK_IAM
kafka.sasl.jaas.config Stringa di configurazione dell'account di accesso JAAS.
kafka.sasl.login.callback.handler.class Nome completo della classe di un gestore di callback di accesso per l'autenticazione SASL.
kafka.sasl.client.callback.handler.class Nome completo della classe di un gestore di callback client per l'autenticazione SASL.
kafka.ssl.truststore.location Percorso del file dell'archivio attendibilità SSL.
kafka.ssl.truststore.password Password per il file dell'archivio attendibilità SSL.
kafka.ssl.keystore.location Percorso del file dell'archivio chiavi SSL.
kafka.ssl.keystore.password Password per il file dell'archivio chiavi SSL.

Per istruzioni complete sull'installazione dell'autenticazione, vedere Autenticazione.

Risorse aggiuntive