Compartir vía


read_kafka función con valores de tabla

Se aplica a:casilla marcada como sí Databricks SQL casilla marcada como Sí Databricks Runtime 13.3 LTS y versiones posteriores

Lee datos de un clúster de Apache Kafka y los devuelve en formato tabular.

Puede leer datos de uno o varios temas de Kafka. Admite tanto las consultas por lotes como la ingesta de streaming.

Sintaxis

read_kafka([option_key => option_value ] [, ...])

Argumentos

Esta función requiere la invocación de parámetros con nombre.

  • option_key: nombre de la opción que se va a configurar. Debe usar comillas invertidas () for options that contain dots ().
  • option_value: expresión constante para establecer la opción. Acepta literales y funciones escalares.

Devoluciones

Registros leídos de un clúster de Apache Kafka con el siguiente esquema:

  • key BINARY: clave del registro de Kafka.
  • value BINARY NOT NULL: valor del registro de Kafka.
  • topic STRING NOT NULL: nombre del tema de Kafka del que se lee el registro.
  • partition INT NOT NULL: identificador de la partición de Kafka de la que se lee el registro.
  • offset BIGINT NOT NULL: número de desplazamiento del registro de la TopicPartition de Kafka.
  • timestamp TIMESTAMP NOT NULL: valor de marca de tiempo del registro. La columna timestampType define a qué corresponde esta marca de tiempo.
  • timestampType INTEGER NOT NULL: tipo de la marca de tiempo que se especifica en la columna timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valores de encabezado proporcionados como parte del registro (si está habilitado).

Ejemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opciones

Encontrará una lista detallada de opciones en la documentación de Apache Spark.

Opciones necesarias

Proporcione la siguiente opción para conectarse al clúster de Kafka.

Opción
bootstrapServers
Tipo: String
Lista separada por comas de pares host/puerto que apuntan al clúster de Kafka.
Valor predeterminado: ninguno

Proporcione solo una de las siguientes opciones para configurar de qué temas de Kafka deben extraerse datos.

Opción
assign
Tipo: String
Cadena JSON que contiene las particiones de temas específicas de las que se van a consumir. Por ejemplo, para '{"topicA":[0,1],"topicB":[2,4]}'las particiones 0 y 1ª del temaA se consumirán.
Valor predeterminado: ninguno
subscribe
Tipo: String
Lista separada por comas de temas de Kafka para leer.
Valor predeterminado: ninguno
subscribePattern
Tipo: String
Expresión regular que coincide con los temas a los que va a suscribirse.
Valor predeterminado: ninguno

Otras opciones

read_kafka se puede usar en consultas por lotes y en consultas de streaming. Las siguientes opciones especifican a qué tipo de consulta se aplican.

Opción
endingOffsets
Tipo: String Tipo de consulta: solo lote
Desplazamientos hasta los que se va a leer para una consulta por lotes, ya sea "latest" para especificar los registros más recientes o una cadena JSON que especifique un desplazamiento final para cada TopicPartition. En JSON, se puede usar -1 como desplazamiento para referirse al más reciente. No se permite -2 (el más antiguo) como desplazamiento.
Valor predeterminado: "latest"
endingOffsetsByTimestamp
Tipo: String Tipo de consulta: solo lote
Cadena JSON que especifica una marca de tiempo final hasta la que se va a leer para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC, por ejemplo,
1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo.
endingOffsetsByTimestamp tiene prioridad sobre endingOffsets.
Valor predeterminado: ninguno
endingTimestamp
Tipo: String Tipo de consulta: solo lote
Valor de cadena de la marca de tiempo en milisegundos desde
1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Si Kafka no devuelve el desplazamiento coincidente, el desplazamiento se establecerá en más reciente. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Nota: endingTimestamp tiene prioridad sobre endingOffsetsByTimestamp y
endingOffsets.
Valor predeterminado: ninguno
includeHeaders
Tipo: Boolean Tipo de consulta: streaming y lote
Indica si se van a incluir los encabezados de Kafka en la fila.
Valor predeterminado: false
kafka.<consumer_option>
Tipo: String Tipo de consulta: streaming y lote
Las opciones específicas de consumidor de Kafka se pueden pasar con el prefijo kafka.. Estas opciones deben especificarse con acentos graves; de lo contrario, recibirá un error del analizador. Encontrará las opciones en la documentación de Kafka.
Nota: No debe establecer las siguientes opciones con esta función:
key.deserializer, value.deserializer, , bootstrap.servers, group.id
Valor predeterminado: ninguno
maxOffsetsPerTrigger
Tipo: Long Tipo de consulta: solo streaming
Número máximo de desplazamientos o filas que se procesan por intervalo del desencadenador. El número total especificado de desplazamientos se dividirá proporcionalmente entre las TopicPartitions.
Valor predeterminado: ninguno
startingOffsets
Tipo: String Tipo de consulta: streaming y lote
Punto inicial cuando se inicia una consulta, ya sea "earliest", que procede de los desplazamientos más antiguos; "latest", que procede de los desplazamientos más recientes; o bien una cadena JSON que especifique un desplazamiento inicial para cada TopicPartition. En JSON, se puede usar -2 como desplazamiento para referirse al más antiguo y -1 al más reciente.
Nota: En el caso de las consultas por lotes, no se permite la más reciente (ya sea implícitamente o mediante el uso de -1 en JSON). Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio.
Valor predeterminado: "latest" para streaming y "earliest" para lote
startingOffsetsByTimestamp
Tipo: String Tipo de consulta: streaming y lote
Cadena JSON que especifica una marca de tiempo inicial para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC; por ejemplo, 1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento se ajustará al valor de la opción startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tiene prioridad sobre startingOffsets.
Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio.
Valor predeterminado: ninguno
startingOffsetsByTimestampStrategy
Tipo: String Tipo de consulta: streaming y lote
Esta estrategia se utiliza cuando el desplazamiento inicial especificado por marca temporal (ya sea global o por partición) no coincide con el desplazamiento que ha devuelto Kafka. Las estrategias disponibles son las siguientes:
  • "error": se produce un error en la consulta.
  • "latest": asigna el desplazamiento más reciente para estas particiones para que Spark pueda leer registros más recientes de estas particiones en microprocesos posteriores.

Valor predeterminado: "error"
startingTimestamp
Tipo: String Tipo de consulta: streaming y lote
Valor de cadena de la marca de tiempo en milisegundos desde
1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento se ajustará al valor de la opción startingOffsetsByTimestampStrategy.
startingTimestamp tiene prioridad sobre startingOffsetsByTimestamp y startingOffsets.
Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta comenzarán antes.
Valor predeterminado: ninguno

Nota:

El desplazamiento devuelto para cada partición es el desplazamiento más antiguo cuya marca de tiempo es igual o mayor que la marca de tiempo especificada en la partición correspondiente. El comportamiento varía entre opciones si Kafka no devuelve el desplazamiento coincidente: compruebe la descripción de cada opción.

Spark simplemente pasa la información de marca de tiempo a KafkaConsumer.offsetsForTimes y no interpreta ni razona sobre el valor. Para obtener más información sobre KafkaConsumer.offsetsForTimes, consulte la documentación. Además, el significado de la marca de tiempo de aquí puede variar según la configuración de Kafka (log.message.timestamp.type). Para obtener más información, consulte la documentación de Apache Kafka.