Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usare il connettore predefinito per sottoscrivere Google Pub/Sub. Questo connettore fornisce semantica di elaborazione di tipo exactly-once per i record del sottoscrittore.
Nota
Pub/Sub potrebbe pubblicare record duplicati o i record potrebbero arrivare al sottoscrittore non in ordine. Scrivere codice per gestire record duplicati e non ordinati.
Configurare un flusso Pub/Sub
Nell'esempio di codice seguente viene illustrata la sintassi di base per la configurazione di una lettura di Streaming Strutturato da Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.
Configurare l'accesso a Pub/Sub
Le credenziali configurate devono avere i ruoli seguenti.
| Ruoli | Obbligatorio o facoltativo | Modalità di utilizzo del ruolo |
|---|---|---|
roles/pubsub.viewer oppure roles/viewer |
Richiesto | Controlla se la sottoscrizione esiste e ottiene la sottoscrizione. |
roles/pubsub.subscriber |
Richiesto | Recupera i dati da una sottoscrizione. |
roles/pubsub.editor oppure roles/editor |
Facoltativo | Abilita la creazione di una sottoscrizione se non esiste e consente l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso. |
Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Per autorizzare una connessione sono necessarie le opzioni seguenti:
clientEmailclientIdprivateKeyprivateKeyId
Informazioni sullo schema Pub/Sub
Lo schema del flusso è conforme ai record estratti da Pub/Sub, come descritto nella tabella seguente.
| Campo | TIPO |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurare le opzioni per la lettura in streaming Pub/Sub
Nella tabella seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni vengono configurate come parte di una lettura Structured Streaming usando .option("<optionName>", "<optionValue>") la sintassi.
Nota
Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Ciò riflette i dettagli dell'implementazione interna e le opzioni funzionano in modo analogo a quelle di altri connettori Structured Streaming, ad eccezione del fatto che i record vengono recuperati e quindi elaborati.
| Opzione | Valore predefinito | Descrizione |
|---|---|---|
numFetchPartitions |
Impostare su metà del numero di esecutori presenti all'inizializzazione del flusso. | Numero di attività Spark parallele che recuperano record da una sottoscrizione. |
deleteSubscriptionOnStreamStop |
false |
Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming. |
maxBytesPerTrigger |
none |
Un limite flessibile per la dimensione del batch da elaborare durante ogni micro-batch attivato. |
maxRecordsPerFetch |
1000 |
Numero di record da recuperare per ogni attività prima dell'elaborazione dei record. |
maxFetchPeriod |
10s |
Durata del tempo di recupero per ogni attività prima dell'elaborazione dei record. Accetta una stringa di durata, 1s ad esempio per 1 secondo o 1m per 1 minuto. Databricks consiglia di usare il valore predefinito. |
Usare l'elaborazione batch incrementale con Pub/Sub
È possibile usare Trigger.AvailableNow per utilizzare i record disponibili dalle origini Pub/Sub come batch incrementale.
Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . I record elaborati dal batch includono tutti i dati recuperati in precedenza e tutti i record appena pubblicati con un timestamp minore del timestamp di inizio del flusso registrato. Per altre informazioni, vedere AvailableNow: Elaborazione batch incrementale.
Monitorare le metriche di streaming pub/sub
Le metriche di stato di Structured Streaming segnalano il numero di record recuperati e pronti per l'elaborazione, le dimensioni dei record recuperati e pronti per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso. Di seguito è riportato un esempio di queste metriche:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limiti
Pub/Sub non supporta l'esecuzione speculativa (spark.speculation).