Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Función con valores de tabla de transmisión por secuencias
Se aplica a:
Databricks SQL
Databricks Runtime 13.3 LTS y versiones posteriores
Devuelve una tabla con registros leídos de Pub/Sub de un tema. Solo admite consultas de streaming.
Sintaxis
read_pubsub( { parameter => value } [, ...])
Argumentos
read_pubsub requiere una invocación de parámetros con nombre.
Los únicos argumentos necesarios son subscriptionId, projectId, y topicId. Todos los demás argumentos son opcionales.
Para obtener descripciones de argumentos completas, vea Opciones de configuración para la lectura de secuencias pub/sub.
Databricks recomienda usar secretos al proporcionar opciones de autorización. Consulte la secret función.
Para obtener más información sobre cómo configurar el acceso a Pub/Sub, vea Configurar el acceso a Pub/Sub.
| Parámetro | Tipo | Descripción |
|---|---|---|
subscriptionId |
STRING |
Obligatorio, el identificador único asignado a una suscripción de Pub/Sub. |
projectId |
STRING |
Obligatorio, el identificador de proyecto de Google Cloud asociado al tema Pub/Sub. |
topicId |
STRING |
Obligatorio, el identificador o el nombre del tema Pub/Sub al que suscribirse. |
clientEmail |
STRING |
Dirección de correo electrónico asociada a una cuenta de servicio para la autenticación. |
clientId |
STRING |
Identificador de cliente asociado a la cuenta de servicio para la autenticación. |
privateKeyId |
STRING |
Identificador de la clave privada asociada a la cuenta de servicio. |
privateKey |
STRING |
Clave privada asociada a la cuenta de servicio para la autenticación. |
Estos argumentos se usan para un ajuste más preciso al leer desde Pub/Sub:
| Parámetro | Tipo | Descripción |
|---|---|---|
numFetchPartitions |
STRING |
Opcional con el número predeterminado de ejecutores. Número de tareas paralelas de Spark que capturan registros de una suscripción. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opcional con predeterminadfalse. Si se establece en true, la suscripción pasada a la secuencia se elimina cuando finaliza el trabajo de streaming. |
maxBytesPerTrigger |
STRING |
Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado. El valor predeterminado es "none". |
maxRecordsPerFetch |
STRING |
Número de registros que se van a capturar por tarea antes de procesar los registros. El valor predeterminado es "1000". |
maxFetchPeriod |
STRING |
Duración de tiempo para cada tarea que se va a capturar antes de procesar los registros. El valor predeterminado es "10s". |
Devoluciones
Tabla de registros Pub/Sub con el esquema siguiente. La columna de atributos podría ser null, pero todas las demás columnas no son null.
| Nombre | Tipo de datos | Anulable | Estándar | Descripción |
|---|---|---|---|---|
messageId |
STRING |
No | Identificador único del mensaje Pub/Sub. | |
payload |
BINARY |
No | Contenido del mensaje Pub/Sub. | |
attributes |
STRING |
Sí | Pares clave-valor que representan los atributos del mensaje Pub/Sub. Se trata de una cadena codificada en json. | |
publishTimestampInMillis |
BIGINT |
No | Marca de tiempo cuando se publicó el mensaje, en milisegundos. | |
sequenceNumber |
BIGINT |
No | Identificador único del registro dentro de su partición. |
Ejemplos
-- Streaming Ingestion from Pubsub using Google Service Account secrets
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
clientEmail => secret('app-events', 'clientEmail'),
clientId => secret('app-events', 'clientId'),
privateKeyId => secret('app-events', 'privateKeyId'),
privateKey => secret('app-events', 'privateKey')
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic'
);
Los datos ahora deben consultarse desde el testing.streaming_table para realizar un análisis posterior.
Consultas erróneas:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project'
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => 'app-events-1234',
projectId => 'app-events-project',
topicId => 'app-events-topic',
maxRecordsPerFetchLimit => '1000001'
);