Compartir a través de


Suscribirse a Google Pub/Sub

Azure Databricks proporciona un conector integrado para suscribirse a Google Pub/Sub en Databricks Runtime 13.3 LTS y versiones posteriores. Este conector proporciona una semántica de procesamiento una sola vez para los registros del suscriptor.

Nota:

Pub/Sub podría publicar registros duplicados y los registros podrían llegar al suscriptor desordenados. Debe escribir código de Azure Databricks para controlar los registros duplicados y desordenados.

Ejemplo de sintaxis

En el ejemplo de código siguiente se muestra la sintaxis básica para configurar una lectura de Structured Streaming desde Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Para obtener más opciones de configuración, consulte Configurar opciones para la lectura de streaming de Pub/Sub.

Configurar acceso a Pub/Sub

Databricks recomienda usar secretos al proporcionar opciones de autorización. Se requieren las siguientes opciones para autorizar una conexión:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

En la tabla siguiente se describen los roles necesarios para las credenciales configuradas:

Roles Requerido u Opcional Cómo se utiliza
roles/pubsub.viewer o roles/viewer Obligatorio Comprobar si la suscripción existe y obtener la suscripción
roles/pubsub.subscriber Obligatorio Capturar datos de una suscripción
roles/pubsub.editor o roles/editor Opcionales Habilita la creación de una suscripción si no existe y también permite el uso de deleteSubscriptionOnStreamStop para eliminar suscripciones al finalizar el flujo

Esquema Pub/Sub

El esquema del flujo coincide con los registros que se capturan desde Pub/Sub, como se describe en la tabla siguiente:

Campo Tipo
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurar opciones para la lectura de streaming de Pub/Sub

En la tabla siguiente se describen las opciones admitidas para Pub/Sub. Todas las opciones se configuran como parte de una lectura de Structured Streaming mediante la sintaxis .option("<optionName>", "<optionValue>").

Nota:

Algunas opciones de configuración de Pub/Sub usan el concepto de capturas en lugar de microlotes. Esto refleja los detalles internos de implementación, y las opciones funcionan de forma similar a los corolarios en otros conectores de Structured Streaming, con la excepción de que los registros se capturan y, a continuación, se procesan.

Opción Valor predeterminado Descripción
numFetchPartitions Se establece en la mitad del número de ejecutores presentes en la inicialización de la transmisión. Número de tareas paralelas de Spark que capturan registros de una suscripción.
deleteSubscriptionOnStreamStop false Si true, la suscripción pasada al flujo se elimina cuando finaliza el trabajo de streaming.
maxBytesPerTrigger None Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado.
maxRecordsPerFetch 1 000 Número de registros que se van a capturar por tarea antes de procesar los registros.
maxFetchPeriod 10 segundos Duración de tiempo para cada tarea que se va a capturar antes de procesar los registros. Databricks recomienda usar el valor predeterminado.

Semántica de procesamiento por lotes incremental para Pub/Sub

Puede usar Trigger.AvailableNow para consumir registros disponibles de los orígenes Pub/Sub de un lote incremental.

Azure Databricks registra la marca de tiempo cuando se inicia una lectura con el valor Trigger.AvailableNow. Los registros procesados por el lote incluyen todos los datos capturados anteriormente y todos los registros recién publicados con una marca de tiempo menor que la marca de tiempo de inicio del flujo registrada.

Consulte Configuración del procesamiento por lotes incremental.

Supervisar métricas de streaming

Las métricas de progreso Structured Streaming notifican el número de registros capturados y listos para procesar, el tamaño de los registros capturados y listos para procesar, y el número de duplicados vistos desde el inicio del flujo. A continuación, se muestra un ejemplo de estas métricas:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limitaciones

La ejecución especulativa (spark.speculation) no se admite con Pub/Sub.