Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Use el conector integrado para suscribirse a Google Pub/Sub. Este conector proporciona una semántica de procesamiento una sola vez para los registros del suscriptor.
Nota:
Pub/Sub podría publicar registros duplicados o los registros podrían llegar al suscriptor fuera de orden. Escriba código para controlar los registros duplicados y desordenados.
Configurar una secuencia Pub/Sub
En el ejemplo de código siguiente se muestra la sintaxis básica para configurar una lectura de Structured Streaming desde Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Para obtener más opciones de configuración, consulte Configurar opciones para la lectura de streaming de Pub/Sub.
Configurar acceso a Pub/Sub
Las credenciales que configure deben tener los siguientes roles.
| Funciones | Requerido u Opcional | Cómo se usa el rol |
|---|---|---|
roles/pubsub.viewer o roles/viewer |
Obligatorio | Comprueba si la suscripción existe y obtiene la suscripción. |
roles/pubsub.subscriber |
Obligatorio | Captura datos de una suscripción. |
roles/pubsub.editor o roles/editor |
Opcionales | Habilita la creación de una suscripción si no existe y permite el uso de deleteSubscriptionOnStreamStop para eliminar suscripciones en la terminación del flujo. |
Databricks recomienda usar secretos al proporcionar opciones de autorización. Se requieren las siguientes opciones para autorizar una conexión:
clientEmailclientIdprivateKeyprivateKeyId
Descripción del esquema Pub/Sub
El esquema de la secuencia coincide con los registros que se recuperan de Pub/Sub, como se describe en la tabla siguiente.
| Campo | Tipo |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opciones para la lectura de streaming de Pub/Sub
En la tabla siguiente se describen las opciones admitidas para Pub/Sub. Todas las opciones se configuran como parte de una lectura de Structured Streaming mediante la sintaxis .option("<optionName>", "<optionValue>").
Nota:
Algunas opciones de configuración de Pub/Sub usan el concepto de capturas en lugar de microlotes. Esto refleja los detalles internos de implementación, y las opciones funcionan de forma similar a los corolarios en otros conectores de Structured Streaming, con la excepción de que los registros se capturan y, a continuación, se procesan.
| Opción | Valor predeterminado | Descripción |
|---|---|---|
numFetchPartitions |
Se establece en la mitad del número de ejecutores presentes en la inicialización de la transmisión. | Número de tareas paralelas de Spark que capturan registros de una suscripción. |
deleteSubscriptionOnStreamStop |
false |
Si true, la suscripción pasada al flujo se elimina cuando finaliza el trabajo de streaming. |
maxBytesPerTrigger |
none |
Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado. |
maxRecordsPerFetch |
1000 |
Número de registros que se van a capturar por tarea antes de procesar los registros. |
maxFetchPeriod |
10s |
La duración del tiempo necesario para cada tarea antes de procesar los registros. Acepta una cadena de duración, por ejemplo, 1s durante 1 segundo o 1m durante 1 minuto. Databricks recomienda usar el valor predeterminado. |
Uso del procesamiento por lotes incremental con Pub/Sub
Puede usar Trigger.AvailableNow para consumir los registros disponibles de las fuentes Pub/Sub incrementándose en lotes.
Azure Databricks registra la marca de tiempo cuando se inicia una lectura con el valor Trigger.AvailableNow. Los registros procesados por el lote incluyen todos los datos capturados anteriormente y todos los registros recién publicados con una marca de tiempo menor que la marca de tiempo de inicio del flujo registrada. Para obtener más información, vea AvailableNow: Procesamiento por lotes incremental.
Supervisión de métricas de streaming de Pub/Sub
Las métricas de progreso Structured Streaming notifican el número de registros capturados y listos para procesar, el tamaño de los registros capturados y listos para procesar, y el número de duplicados vistos desde el inicio del flujo. A continuación, se muestra un ejemplo de estas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitaciones
Pub/Sub no admite la ejecución especulativa (spark.speculation).