Nota
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare ad accedere o a cambiare directory.
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare a cambiare directory.
Azure Databricks offre un connettore predefinito per sottoscrivere Google Pub/Sub in Databricks Runtime 13.3 LTS e versioni successive. Questo connettore fornisce semantica di elaborazione di tipo exactly-once per i record del sottoscrittore.
Nota
Pub/Sub potrebbe pubblicare record duplicati e i record potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere il codice di Azure Databricks per gestire i record duplicati e non ordinati.
Esempio di sintassi
Nell'esempio di codice seguente viene illustrata la sintassi di base per la configurazione di una lettura di Structured Streaming (flusso strutturato) da Pub/Sub.
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.
Configurare l'accesso a Pub/Sub
La tabella seguente descrive i ruoli necessari per le credenziali configurate:
| Ruoli | Obbligatorio o facoltativo | Modalità di utilizzo |
|---|---|---|
roles/pubsub.viewer oppure roles/viewer |
Richiesto | Controllare se la sottoscrizione esiste e ottenere la sottoscrizione |
roles/pubsub.subscriber |
Richiesto | Recuperare i dati da una sottoscrizione |
roles/pubsub.editor oppure roles/editor |
Facoltativo | Abilita la creazione di una sottoscrizione se non esiste e consente anche l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso |
Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Per autorizzare una connessione sono necessarie le opzioni seguenti:
clientEmailclientIdprivateKeyprivateKeyId
Schema Pub/Sub
Lo schema del flusso corrisponde ai record ottenuti da Pub/Sub, come descritto nella tabella seguente.
| Campo | TIPO |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurare le opzioni per la lettura in streaming Pub/Sub
Nella tabella seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni vengono configurate come parte di una lettura Structured Streaming usando .option("<optionName>", "<optionValue>") la sintassi.
Nota
Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Ciò riflette i dettagli dell'implementazione interna e le opzioni funzionano in modo analogo a quelle di altri connettori Structured Streaming, ad eccezione del fatto che i record vengono recuperati e quindi elaborati.
| Opzione | Valore predefinito | Descrizione |
|---|---|---|
numFetchPartitions |
Impostare su metà del numero di esecutori presenti all'inizializzazione del flusso. | Numero di attività Spark parallele che recuperano record da una sottoscrizione. |
deleteSubscriptionOnStreamStop |
false |
Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming. |
maxBytesPerTrigger |
Nessuno | Un limite flessibile per la dimensione del batch da elaborare durante ogni micro-batch attivato. |
maxRecordsPerFetch |
1000 | Numero di record da recuperare per ogni attività prima dell'elaborazione dei record. |
maxFetchPeriod |
10 secondi | Durata del tempo di recupero per ogni attività prima dell'elaborazione dei record. Databricks consiglia di usare il valore predefinito. |
Elaborazione semantica batch incrementale per Pub/Sub
È possibile usare Trigger.AvailableNow per consumare i record disponibili dalle origini Pub/Sub in modalità batch incrementale.
Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . I record elaborati dal batch includono tutti i dati recuperati in precedenza e tutti i record appena pubblicati con un timestamp minore del timestamp di inizio del flusso registrato.
Vedere AvailableNow: Elaborazione batch incrementale.
Monitoraggio delle metriche di streaming
Le metriche di stato di Structured Streaming segnalano il numero di record recuperati e pronti per l'elaborazione, le dimensioni dei record recuperati e pronti per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso. Di seguito è riportato un esempio di queste metriche:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limiti
L'esecuzione speculativa (spark.speculation) non è supportata con Pub/Sub.