Compartir a través de


read_kafka función con valores de tabla

Se aplica a: casilla marcada como sí Databricks SQL casilla marcada como Sí Databricks Runtime 13.3 LTS y versiones posteriores

Lee datos de un clúster de Apache Kafka y los devuelve en formato tabular.

Puede leer datos de uno o varios temas de Kafka. Admite tanto las consultas por lotes como la ingesta de streaming.

Sintaxis

read_kafka([option_key => option_value ] [, ...])

Argumentos

Esta función requiere la invocación de parámetros con nombre.

  • option_key: nombre de la opción que se va a configurar. Debe usar el acento grave (`) para las opciones que contienen puntos (.).
  • option_value: expresión constante para establecer la opción. Acepta literales y funciones escalares.

Devoluciones

Registros leídos de un clúster de Apache Kafka con el siguiente esquema:

  • key BINARY: clave del registro de Kafka.
  • value BINARY NOT NULL: valor del registro de Kafka.
  • topic STRING NOT NULL: nombre del tema de Kafka del que se lee el registro.
  • partition INT NOT NULL: identificador de la partición de Kafka de la que se lee el registro.
  • offset BIGINT NOT NULL: número de desplazamiento del registro de la TopicPartition de Kafka.
  • timestamp TIMESTAMP NOT NULL: valor de marca de tiempo del registro. La columna timestampType define a qué corresponde esta marca de tiempo.
  • timestampType INTEGER NOT NULL: tipo de la marca de tiempo que se especifica en la columna timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valores de encabezado proporcionados como parte del registro (si está habilitado).

Ejemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opciones

Encontrará una lista detallada de opciones en la documentación de Apache Spark.

Opciones necesarias

Proporcione la siguiente opción para conectarse al clúster de Kafka.

Opción
bootstrapServers

Tipo: String

Lista separada por comas de pares host/puerto que apuntan al clúster de Kafka.

Valor predeterminado: ninguno

Proporcione solo una de las siguientes opciones para configurar de qué temas de Kafka deben extraerse datos.

Opción
assign

Tipo: String

Cadena JSON que contiene las particiones de temas específicas de las que se van a consumir. Por ejemplo, para '{"topicA":[0,1],"topicB":[2,4]}', se consumirán de las particiones 0 y 1 de topicA.

Valor predeterminado: ninguno
subscribe

Tipo: String

Lista separada por comas de temas de Kafka para leer.

Valor predeterminado: ninguno
subscribePattern

Tipo: String

Expresión regular que coincide con los temas a los que va a suscribirse.

Valor predeterminado: ninguno

Otras opciones

read_kafka se puede usar en consultas por lotes y en consultas de streaming. Las siguientes opciones especifican a qué tipo de consulta se aplican.

Opción
endingOffsets

Tipo: String Tipo de consulta: solo lote

Desplazamientos hasta los que se va a leer para una consulta por lotes, ya sea "latest" para especificar los registros más recientes o una cadena JSON que especifique un desplazamiento final para cada TopicPartition. En JSON, se puede usar -1 como desplazamiento para referirse al más reciente. No se permite -2 (el más antiguo) como desplazamiento.

Valor predeterminado: "latest"
endingOffsetsByTimestamp

Tipo: String Tipo de consulta: solo lote

Cadena JSON que especifica una marca de tiempo final hasta la que se va a leer para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC, por ejemplo,
1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo.
endingOffsetsByTimestamp tiene prioridad sobre endingOffsets.

Valor predeterminado: ninguno
endingTimestamp

Tipo: String Tipo de consulta: solo lote

Valor de cadena de la marca de tiempo en milisegundos desde
1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Si Kafka no devuelve el desplazamiento coincidente, este se establecerá como el más reciente. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Nota: endingTimestamp tiene prioridad sobre endingOffsetsByTimestamp y
endingOffsets.

Valor predeterminado: ninguno
includeHeaders

Tipo: Boolean Tipo de consulta: streaming y lote

Indica si se van a incluir los encabezados de Kafka en la fila.

Valor predeterminado: false
kafka.<consumer_option>

Tipo: String Tipo de consulta: streaming y lote

Las opciones específicas de consumidor de Kafka se pueden pasar con el prefijo kafka.. Estas opciones deben especificarse con acentos graves; de lo contrario, recibirá un error del analizador. Encontrará las opciones en la documentación de Kafka.

Nota: No debe establecer las siguientes opciones con esta función:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Valor predeterminado: ninguno
maxOffsetsPerTrigger

Tipo: Long Tipo de consulta: solo streaming

Número máximo de desplazamientos o filas que se procesan por intervalo del desencadenador. El número total especificado de desplazamientos se dividirá proporcionalmente entre las TopicPartitions.

Valor predeterminado: ninguno
startingOffsets

Tipo: String Tipo de consulta: streaming y lote

Punto inicial cuando se inicia una consulta, ya sea "earliest", que procede de los desplazamientos más antiguos; "latest", que procede de los desplazamientos más recientes; o bien una cadena JSON que especifique un desplazamiento inicial para cada TopicPartition. En JSON, se puede usar -2 como desplazamiento para referirse al más antiguo y -1 al más reciente.

Nota: En el caso de las consultas por lotes, no se permite la más reciente (ya sea implícitamente o mediante el uso de -1 en JSON). Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio.

Valor predeterminado: "latest" para streaming y "earliest" para lote
startingOffsetsByTimestamp

Tipo: String Tipo de consulta: streaming y lote

Cadena JSON que especifica una marca de tiempo inicial para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC; por ejemplo, 1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento seguirá al valor de la opción startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tiene prioridad sobre startingOffsets.

Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio.

Valor predeterminado: ninguno
startingOffsetsByTimestampStrategy

Tipo: String Tipo de consulta: streaming y lote

Esta estrategia se usa cuando el desplazamiento inicial especificado por marca de tiempo (global o por partición) no coincide con el desplazamiento de Kafka que se ha devuelto. Las estrategias disponibles son las siguientes:

* "error": se produce un error en la consulta
* "latest": asigna el desplazamiento más reciente para estas particiones para que Spark pueda leer registros más recientes de estas particiones en microlotes posteriores.

Valor predeterminado: "error"
startingTimestamp

Tipo: String Tipo de consulta: streaming y lote

Valor de cadena de la marca de tiempo en milisegundos desde
1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento seguirá al valor de la opción startingOffsetsByTimestampStrategy.
startingTimestamp tiene prioridad sobre startingOffsetsByTimestamp y startingOffsets.

Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta comenzarán antes.

Valor predeterminado: ninguno

Nota:

El desplazamiento devuelto para cada partición es el desplazamiento más antiguo cuya marca de tiempo es igual o mayor que la marca de tiempo especificada en la partición correspondiente. El comportamiento varía entre las opciones si Kafka no devuelve el desplazamiento coincidente (consulte la descripción de cada opción).

Spark solo pasa la información de la marca de tiempo a KafkaConsumer.offsetsForTimes, no interpreta ni da ningún motivo sobre el valor. Para obtener más información sobre KafkaConsumer.offsetsForTimes, consulte la documentación. Además, el significado de la marca de tiempo de aquí puede variar según la configuración de Kafka (log.message.timestamp.type). Para obtener más información, consulte la documentación de Apache Kafka.