Compartir a través de


Función con valores de tabla de transmisión por secuencias read_pulsar

Se aplica a: casilla marcada como sí Databricks SQL casilla marcada como Sí Databricks Runtime 14.1 y versiones posteriores

Importante

Esta característica está en versión preliminar pública.

Devuelve una tabla con los registros leídos desde Pulsar.

Esta función con valores de tabla solo admite la transmisión por secuencias y no la consulta por lotes.

Sintaxis

read_pulsar ( { option_key => option_value } [, ...] )

Argumentos

Esta función requiere la invocación de parámetros con nombre para las claves de opción.

Las opciones serviceUrl y topic son obligatorias.

Las descripciones de los argumentos son breves aquí. Consulte la documentación de Pulsar de flujo estructurado para obtener descripciones ampliadas.

Opción Tipo Valor predeterminado Descripción
serviceUrl STRING Mandatory Identificador URI del servicio Pulsar.
topic STRING Mandatory Tema del que se va a leer.
predefinedSubscription STRING None Nombre de suscripción predefinido usado por el conector para realizar un seguimiento del progreso de la aplicación de spark.
subscriptionPrefix STRING None Prefijo usado por el conector para generar una suscripción aleatoria para realizar un seguimiento del progreso de la aplicación de spark.
pollTimeoutMs LONG 120000 Tiempo de espera para leer mensajes de Pulsar en milisegundos.
failOnDataLoss BOOLEAN true Controla si se produce un error en una consulta cuando se pierden los datos (por ejemplo, los temas se eliminan o los mensajes se eliminan debido a la directiva de retención).
startingOffsets STRING latest El punto de inicio cuando se inicia una consulta, ya sea el más antiguo, el más reciente o una cadena JSON que especifica un desplazamiento concreto. Si es el más reciente, el lector lee los registros más nuevos después de empezar a funcionar. Si es el más antiguo, el lector lee a partir del desplazamiento más antiguo. El usuario también puede especificar una cadena JSON que especifique un desplazamiento específico.
startingTime STRING None Cuando se especifica, el origen de Pulsar leerá los mensajes a partir de la posición del valor de startingTime especificado.

Los siguientes argumentos se usan para la autentificación del cliente de Pulsar:

Opción Tipo Valor predeterminado Descripción
pulsarClientAuthPluginClassName STRING None Nombre del complemento de autenticación.
pulsarClientAuthParams STRING None Parámetros para el complemento de autenticación.
pulsarClientUseKeyStoreTls STRING None Si se va a usar KeyStore para la autenticación tls.
pulsarClientTlsTrustStoreType STRING None Tipo de archivo TrustStore para la autenticación tls.
pulsarClientTlsTrustStorePath STRING None Ruta de acceso del archivo TrustStore para la autenticación tls.
pulsarClientTlsTrustStorePassword STRING None Contraseña de TrustStore para la autenticación tls.

Estos argumentos se usan para la configuración y autentificación del control de admisión de Pulsar, la configuración del administrador de Pulsar solo es necesaria cuando el control de admisión está habilitado (cuando maxBytesPerTrigger está establecido).

Opción Tipo Valor predeterminado Descripción
maxBytesPerTrigger BIGINT None Límite temporal del número máximo de bytes que queremos procesar por microlote. Si se especifica esto, también es necesario especificar admin.url.
adminUrl STRING None Configuración del Pulsar serviceHttpUrl. Solo es necesario cuando se especifica maxBytesPerTrigger.
pulsarAdminAuthPlugin STRING None Nombre del complemento de autenticación.
pulsarAdminAuthParams STRING None Parámetros para el complemento de autenticación.
pulsarClientUseKeyStoreTls STRING None Si se va a usar KeyStore para la autenticación tls.
pulsarAdminTlsTrustStoreType STRING None Tipo de archivo TrustStore para la autenticación tls.
pulsarAdminTlsTrustStorePath STRING None Ruta de acceso del archivo TrustStore para la autenticación tls.
pulsarAdminTlsTrustStorePassword STRING None Contraseña de TrustStore para la autenticación tls.

Devoluciones

Tabla de registros de Pulsar con el esquema siguiente.

  • __key STRING NOT NULL: clave de mensaje de Pulsar.

  • value BINARY NOT NULL: valor de mensaje de Pulsar.

    Nota: En el caso de los temas con el esquema Avro o JSON, en lugar de cargar contenido en un campo de valor binario, el contenido se expandirá para conservar los nombres de campo y los tipos de campo del tema Pulsar.

  • __topic STRING NOT NULL: nombre del tema Pulsar.

  • __messageId BINARY NOT NULL: id. de mensaje de Pulsar.

  • __publishTime TIMESTAMP NOT NULL: tiempo de publicación del mensaje de Pulsar.

  • __eventTime TIMESTAMP NOT NULL: tiempo de evento de mensaje de Pulsar.

  • __messageProperties MAP<STRING, STRING>: propiedades del mensaje de Pulsar.

Ejemplos

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.