Condividi tramite


Options

Questa pagina descrive le opzioni di configurazione per la lettura e la scrittura in Apache Kafka usando Structured Streaming in Azure Databricks.

Il connettore Kafka di Azure Databricks si basa sul connettore Apache Spark Kafka e supporta tutte le opzioni di configurazione Kafka standard. Qualsiasi opzione preceduta da kafka. viene passata direttamente al client Kafka sottostante. Ad esempio, .option("kafka.max.poll.records", "500") imposta la proprietà del max.poll.records consumer Kafka. Vedere la documentazione sulla configurazione di Kafka per l'elenco completo delle proprietà Kafka disponibili.

Per altre opzioni di origine e sink structured streaming non elencate in questa pagina, vedere Structured Streaming + Kafka Integration Guide (Guida all'integrazione di Structured Streaming + Kafka).

Opzioni obbligatorie

Per la lettura e la scrittura è necessaria l'opzione seguente:

Opzione Valore Descrizione
kafka.bootstrap.servers Elenco delimitato da virgole di host:port Configurazione di Kafka bootstrap.servers . Se non sono presenti dati da Kafka, controllare prima l'elenco di indirizzi del broker. Se l'elenco di indirizzi del broker non è corretto, potrebbero non esserci errori. Ciò è dovuto al fatto che il client Kafka presuppone che alla fine i broker diventino disponibili e, in caso di errori di rete, continui a riprovare indefinitamente.

Quando si legge da Kafka, è necessario specificare anche una delle opzioni seguenti per identificare gli argomenti da utilizzare:

Opzione Valore Descrizione
subscribe Elenco delimitato da virgole di argomenti Elenco di argomenti a cui sottoscrivere.
subscribePattern Stringa regex Java Modello utilizzato per iscriversi agli argomento/i.
assign Stringa JSON {"topicA":[0,1],"topicB":[2,4]} Partizioni di argomenti specifiche da utilizzare.

Quando si scrive in Kafka, è possibile impostare facoltativamente l'opzione topic per specificare un argomento di destinazione per tutte le righe. Se non è impostato, il dataframe deve includere una topic colonna.

Opzioni comuni per il lettore

Le opzioni seguenti vengono comunemente usate durante la lettura da Kafka:

Opzione Valore Default Descrizione
minPartitions INT none Numero minimo di partizioni da leggere da Kafka. In genere, Spark crea una partizione per ogni partizione dell'argomento Kafka. L'impostazione di questo livello superiore suddivide le partizioni Kafka di grandi dimensioni in partizioni Spark più piccole per aumentare il parallelismo. Utile per gestire l'asimmetria o il picco dei carichi di dati. Nota: l'abilitazione di questa reinizializzazione dei consumer Kafka in ogni trigger, che può influire sulle prestazioni quando si usa SSL.
maxRecordsPerPartition LONG none Numero massimo di record per partizione Spark. Quando è impostato, Spark divide le partizioni Kafka in modo che ogni partizione Spark abbia al massimo questo numero di record. Può essere usato con minPartitions, quando entrambi sono impostati, Spark usa qualsiasi risultato in più partizioni.
failOnDataLoss BOOLEAN true Indica se la query non riesce quando è possibile che i dati siano andato persi. Le query possono non riuscire a leggere in modo permanente i dati da Kafka a causa di molti scenari, ad esempio argomenti eliminati, troncamento degli argomenti prima dell'elaborazione e così via. Si tenta di stimare in modo conservativo se i dati sono stati probabilmente persi o meno. A volte questo può causare falsi allarmi. Impostare questa opzione su false se non funziona come previsto o si vuole che la query continui l'elaborazione nonostante la perdita di dati.
maxOffsetsPerTrigger LONG none [Solo streaming] Limite di velocità per il numero massimo di offset elaborati per intervallo di trigger. Il numero totale di offset viene suddiviso proporzionalmente tra le partizioni degli argomenti.
Per un controllo del flusso più avanzato, è anche possibile usare minOffsetsPerTrigger (offset minimi prima dell'attivazione) e maxTriggerDelay (tempo di attesa massimo, impostazione predefinita 15m). Per informazioni dettagliate, vedere la guida all'integrazione di Spark Kafka .
startingOffsets earliestStringa JSON latest, o latest Determina dove iniziare la lettura all'inizio di una query. Usare earliest per leggere i primi offset disponibili, latest per leggere solo i nuovi dati dopo l'avvio del flusso o una stringa JSON per specificare un offset iniziale per ogni partizione di argomento , ad esempio {"topicA":{"0":23,"1":-2},"topicB":{"0":-2}}. Nel codice JSON si -2 riferisce alla versione meno recente e -1 alla versione più recente.
Per le query di streaming, questo vale solo quando viene avviata una nuova query; la ripresa riprende sempre da dove è stata interrotta la query. Le nuove partizioni individuate iniziano da earliest.
Nota: per le query batch, latest (in modo implicito o tramite -1 in JSON) non è consentito. Per iniziare da un timestamp specifico, usare startingTimestamp o startingOffsetsByTimestamp.
endingOffsets latest o stringa JSON latest [Solo Batch] Punto finale al termine di una query batch. Usare latest per leggere fino agli offset più recenti o a una stringa JSON per specificare un offset finale per ogni partizione di argomento, ad esempio {"topicA":{"0":50,"1":-1},"topicB":{"0":-1}}. Nel codice JSON, -1 fa riferimento alla versione più recente. -2 (earlisest) non è consentito. Per terminare invece con un timestamp specifico, usare endingTimestamp o endingOffsetsByTimestamp.
groupIdPrefix STRING spark-kafka-source (streaming) o spark-kafka-relation (batch) Prefisso per l'ID del gruppo di consumer generato automaticamente. Il connettore genera automaticamente un valore univoco group.id per ogni query. Questa opzione personalizza il prefisso dell'ID generato. Ignorato se kafka.group.id è impostato.
kafka.group.id STRING none ID gruppo da usare durante la lettura da Kafka. Usare con cautela. Per impostazione predefinita, ogni query genera un ID di gruppo univoco per la lettura dei dati. In questo modo, ogni query ha un proprio gruppo di consumatori che non subisce interferenze da parte di nessun altro consumatore e pertanto può leggere tutte le partizioni dei topic sottoscritti. In alcuni scenari, ad esempio l'autorizzazione basata su gruppo Kafka, è possibile usare ID gruppo autorizzati specifici per leggere i dati. Facoltativamente, è possibile impostare l'ID gruppo. Tuttavia, eseguire questa operazione con estrema cautela perché può causare comportamenti imprevisti.
  • È probabile che le query in esecuzione simultanea (sia batch che streaming) con lo stesso ID gruppo interferiscano tra loro, causando la sola lettura di parte dei dati da parte di ogni query.
  • Ciò può verificarsi anche quando le query vengono avviate/riavviate in rapida successione. Per ridurre al minimo questi problemi, impostare la configurazione del consumer Kafka session.timeout.ms affinché sia molto piccola.
includeHeaders BOOLEAN false Indica se includere le intestazioni del messaggio Kafka nell'output.
bytesEstimateWindowLength STRING 300s [Solo streaming] Intervallo di tempo usato per stimare i byte rimanenti tramite la estimatedTotalBytesBehindLatest metrica. Accetta stringhe di durata come 10m (10 minuti) o 600s (600 secondi). Vedere Recuperare le metriche Kafka.

Opzioni comuni del writer

Le opzioni seguenti vengono comunemente usate durante la scrittura in Kafka:

Opzione Valore Default Descrizione
topic STRING none Imposta l'argomento per tutte le righe. In questo modo viene eseguito l'override di qualsiasi topic colonna nei dati.
includeHeaders BOOLEAN false Indica se includere le intestazioni Kafka nella riga.

Importante

Databricks Runtime 13.3 LTS e versioni successive include una versione più recente della libreria kafka-clients che abilita le scritture idempotenti per impostazione predefinita. Se il sink Kafka usa la versione 2.8.0 o successiva con ACL configurati ma senza IDEMPOTENT_WRITE abilitato, le scritture avranno esito negativo. Risolvere questo problema eseguendo l'aggiornamento a Kafka 2.8.0 o versione successiva oppure impostando .option("kafka.enable.idempotence", "false").

Opzioni di autenticazione

Azure Databricks supporta più metodi di autenticazione per Kafka, tra cui le credenziali del servizio Unity Catalog, SASL/SSL e le opzioni specifiche del cloud per AWS MSK, Hub eventi di Azure e Google Cloud Managed Kafka.

Azure Databricks consiglia di usare le credenziali del servizio Catalogo Unity per l'autenticazione ai servizi Kafka gestiti dal cloud:

Opzione Valore Descrizione
databricks.serviceCredential STRING Nome di una credenziale del servizio Catalogo Unity per l'autenticazione ai servizi Kafka gestiti dal cloud (AWS MSK, Hub eventi di Azure o Google Cloud Managed Kafka). Disponibile in Databricks Runtime 16.1 e versioni successive.
databricks.serviceCredential.scope STRING Ambito OAuth per le credenziali del servizio. Impostare questa impostazione solo se Azure Databricks non può dedurre automaticamente l'ambito per il servizio Kafka.

Quando si usa una credenziale del servizio Catalogo Unity, non è necessario specificare opzioni SASL/SSL come kafka.sasl.mechanism, kafka.sasl.jaas.configo kafka.security.protocol.

Le opzioni SASL/SSL comuni includono:

Opzione Valore Descrizione
kafka.security.protocol STRING Protocollo usato per comunicare con i broker (ad esempio, SASL_SSL, SSL, PLAINTEXT).
kafka.sasl.mechanism STRING Meccanismo SASL (ad esempio, PLAIN, SCRAM-SHA-256SCRAM-SHA-512, OAUTHBEARER). AWS_MSK_IAM
kafka.sasl.jaas.config STRING Stringa di configurazione dell'account di accesso JAAS.
kafka.sasl.login.callback.handler.class STRING Nome completo della classe di un gestore di callback di accesso per l'autenticazione SASL.
kafka.sasl.client.callback.handler.class STRING Nome completo della classe di un gestore di callback client per l'autenticazione SASL.
kafka.ssl.truststore.location STRING Percorso del file dell'archivio attendibilità SSL.
kafka.ssl.truststore.password STRING Password per il file dell'archivio attendibilità SSL.
kafka.ssl.keystore.location STRING Percorso del file dell'archivio chiavi SSL.
kafka.ssl.keystore.password STRING Password per il file dell'archivio chiavi SSL.

Per istruzioni complete sull'installazione dell'autenticazione, vedere Autenticazione.

Risorse aggiuntive