Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Se aplica a:
Databricks SQL
Databricks Runtime 13.3 LTS y versiones posteriores
Lee datos de un clúster de Apache Kafka y los devuelve en formato tabular.
Puede leer datos de uno o varios temas de Kafka. Admite tanto las consultas por lotes como la ingesta de streaming.
Sintaxis
read_kafka([option_key => option_value ] [, ...])
Argumentos
Esta función requiere la invocación de parámetros con nombre.
-
option_key: nombre de la opción que se va a configurar. Debe usar comillas invertidas () for options that contain dots (). -
option_value: expresión constante para establecer la opción. Acepta literales y funciones escalares.
Devoluciones
Registros leídos de un clúster de Apache Kafka con el siguiente esquema:
-
key BINARY: clave del registro de Kafka. -
value BINARY NOT NULL: valor del registro de Kafka. -
topic STRING NOT NULL: nombre del tema de Kafka del que se lee el registro. -
partition INT NOT NULL: identificador de la partición de Kafka de la que se lee el registro. -
offset BIGINT NOT NULL: número de desplazamiento del registro de laTopicPartitionde Kafka. -
timestamp TIMESTAMP NOT NULL: valor de marca de tiempo del registro. La columnatimestampTypedefine a qué corresponde esta marca de tiempo. -
timestampType INTEGER NOT NULL: tipo de la marca de tiempo que se especifica en la columnatimestamp. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: valores de encabezado proporcionados como parte del registro (si está habilitado).
Ejemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opciones
Encontrará una lista detallada de opciones en la documentación de Apache Spark.
Opciones necesarias
Proporcione la siguiente opción para conectarse al clúster de Kafka.
| Opción |
|---|
bootstrapServersTipo: StringLista separada por comas de pares host/puerto que apuntan al clúster de Kafka. Valor predeterminado: ninguno |
Proporcione solo una de las siguientes opciones para configurar de qué temas de Kafka deben extraerse datos.
| Opción |
|---|
assignTipo: StringCadena JSON que contiene las particiones de temas específicas de las que se van a consumir. Por ejemplo, para '{"topicA":[0,1],"topicB":[2,4]}'las particiones 0 y 1ª del temaA se consumirán.Valor predeterminado: ninguno |
subscribeTipo: StringLista separada por comas de temas de Kafka para leer. Valor predeterminado: ninguno |
subscribePatternTipo: StringExpresión regular que coincide con los temas a los que va a suscribirse. Valor predeterminado: ninguno |
Otras opciones
read_kafka se puede usar en consultas por lotes y en consultas de streaming. Las siguientes opciones especifican a qué tipo de consulta se aplican.
| Opción |
|---|
endingOffsetsTipo: String Tipo de consulta: solo loteDesplazamientos hasta los que se va a leer para una consulta por lotes, ya sea "latest" para especificar los registros más recientes o una cadena JSON que especifique un desplazamiento final para cada TopicPartition. En JSON, se puede usar -1 como desplazamiento para referirse al más reciente. No se permite -2 (el más antiguo) como desplazamiento.Valor predeterminado: "latest" |
endingOffsetsByTimestampTipo: String Tipo de consulta: solo loteCadena JSON que especifica una marca de tiempo final hasta la que se va a leer para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC, por ejemplo,1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo.endingOffsetsByTimestamp tiene prioridad sobre endingOffsets.Valor predeterminado: ninguno |
endingTimestampTipo: String Tipo de consulta: solo loteValor de cadena de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Si Kafka no devuelve el desplazamiento coincidente, el desplazamiento se establecerá en más reciente. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Nota: endingTimestamp tiene prioridad sobre endingOffsetsByTimestamp yendingOffsets.Valor predeterminado: ninguno |
includeHeadersTipo: Boolean Tipo de consulta: streaming y loteIndica si se van a incluir los encabezados de Kafka en la fila. Valor predeterminado: false |
kafka.<consumer_option>Tipo: String Tipo de consulta: streaming y loteLas opciones específicas de consumidor de Kafka se pueden pasar con el prefijo kafka.. Estas opciones deben especificarse con acentos graves; de lo contrario, recibirá un error del analizador. Encontrará las opciones en la documentación de Kafka.Nota: No debe establecer las siguientes opciones con esta función: key.deserializer, value.deserializer, , bootstrap.servers, group.idValor predeterminado: ninguno |
maxOffsetsPerTriggerTipo: Long Tipo de consulta: solo streamingNúmero máximo de desplazamientos o filas que se procesan por intervalo del desencadenador. El número total especificado de desplazamientos se dividirá proporcionalmente entre las TopicPartitions. Valor predeterminado: ninguno |
startingOffsetsTipo: String Tipo de consulta: streaming y lotePunto inicial cuando se inicia una consulta, ya sea "earliest", que procede de los desplazamientos más antiguos; "latest", que procede de los desplazamientos más recientes; o bien una cadena JSON que especifique un desplazamiento inicial para cada TopicPartition. En JSON, se puede usar -2 como desplazamiento para referirse al más antiguo y -1 al más reciente.Nota: En el caso de las consultas por lotes, no se permite la más reciente (ya sea implícitamente o mediante el uso de -1 en JSON). Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio. Valor predeterminado: "latest" para streaming y "earliest" para lote |
startingOffsetsByTimestampTipo: String Tipo de consulta: streaming y loteCadena JSON que especifica una marca de tiempo inicial para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC; por ejemplo, 1686444353000. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento se ajustará al valor de la opción startingOffsetsByTimestampStrategy.startingOffsetsByTimestamp tiene prioridad sobre startingOffsets.Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio. Valor predeterminado: ninguno |
startingOffsetsByTimestampStrategyTipo: String Tipo de consulta: streaming y loteEsta estrategia se utiliza cuando el desplazamiento inicial especificado por marca temporal (ya sea global o por partición) no coincide con el desplazamiento que ha devuelto Kafka. Las estrategias disponibles son las siguientes:
Valor predeterminado: "error" |
startingTimestampTipo: String Tipo de consulta: streaming y loteValor de cadena de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC; por ejemplo, "1686444353000". Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento se ajustará al valor de la opción startingOffsetsByTimestampStrategy.startingTimestamp tiene prioridad sobre startingOffsetsByTimestamp y startingOffsets.Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta comenzarán antes. Valor predeterminado: ninguno |
Nota:
El desplazamiento devuelto para cada partición es el desplazamiento más antiguo cuya marca de tiempo es igual o mayor que la marca de tiempo especificada en la partición correspondiente. El comportamiento varía entre opciones si Kafka no devuelve el desplazamiento coincidente: compruebe la descripción de cada opción.
Spark simplemente pasa la información de marca de tiempo a KafkaConsumer.offsetsForTimes y no interpreta ni razona sobre el valor. Para obtener más información sobre KafkaConsumer.offsetsForTimes, consulte la documentación. Además, el significado de la marca de tiempo de aquí puede variar según la configuración de Kafka (log.message.timestamp.type). Para obtener más información, consulte la documentación de Apache Kafka.