Iscriviti a Google Pub/Sub

Usare il connettore predefinito per sottoscrivere Google Pub/Sub. Questo connettore fornisce semantica di elaborazione di tipo exactly-once per i record del sottoscrittore.

Nota

Pub/Sub potrebbe pubblicare record duplicati o i record potrebbero arrivare al sottoscrittore non in ordine. Scrivere codice per gestire record duplicati e non ordinati.

Configurare un flusso Pub/Sub

Nell'esempio di codice seguente viene illustrata la sintassi di base per la configurazione di una lettura di Streaming Strutturato da Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.

Configurare l'accesso a Pub/Sub

Le credenziali configurate devono avere i ruoli seguenti.

Ruoli Obbligatorio o facoltativo Modalità di utilizzo del ruolo
roles/pubsub.viewer oppure roles/viewer Richiesto Controlla se la sottoscrizione esiste e ottiene la sottoscrizione.
roles/pubsub.subscriber Richiesto Recupera i dati da una sottoscrizione.
roles/pubsub.editor oppure roles/editor Facoltativo Abilita la creazione di una sottoscrizione se non esiste e consente l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso.

Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Per autorizzare una connessione sono necessarie le opzioni seguenti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Informazioni sullo schema Pub/Sub

Lo schema del flusso è conforme ai record estratti da Pub/Sub, come descritto nella tabella seguente.

Campo TIPO
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurare le opzioni per la lettura in streaming Pub/Sub

Nella tabella seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni vengono configurate come parte di una lettura Structured Streaming usando .option("<optionName>", "<optionValue>") la sintassi.

Nota

Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Ciò riflette i dettagli dell'implementazione interna e le opzioni funzionano in modo analogo a quelle di altri connettori Structured Streaming, ad eccezione del fatto che i record vengono recuperati e quindi elaborati.

Opzione Valore predefinito Descrizione
numFetchPartitions Impostare su metà del numero di esecutori presenti all'inizializzazione del flusso. Numero di attività Spark parallele che recuperano record da una sottoscrizione.
deleteSubscriptionOnStreamStop false Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming.
maxBytesPerTrigger none Un limite flessibile per la dimensione del batch da elaborare durante ogni micro-batch attivato.
maxRecordsPerFetch 1000 Numero di record da recuperare per ogni attività prima dell'elaborazione dei record.
maxFetchPeriod 10s Durata del tempo di recupero per ogni attività prima dell'elaborazione dei record. Accetta una stringa di durata, 1s ad esempio per 1 secondo o 1m per 1 minuto. Databricks consiglia di usare il valore predefinito.

Usare l'elaborazione batch incrementale con Pub/Sub

È possibile usare Trigger.AvailableNow per utilizzare i record disponibili dalle origini Pub/Sub come batch incrementale.

Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . I record elaborati dal batch includono tutti i dati recuperati in precedenza e tutti i record appena pubblicati con un timestamp minore del timestamp di inizio del flusso registrato. Per altre informazioni, vedere AvailableNow: Elaborazione batch incrementale.

Monitorare le metriche di streaming pub/sub

Le metriche di stato di Structured Streaming segnalano il numero di record recuperati e pronti per l'elaborazione, le dimensioni dei record recuperati e pronti per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso. Di seguito è riportato un esempio di queste metriche:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limiti

Pub/Sub non supporta l'esecuzione speculativa (spark.speculation).