Partilhar via


read_kafka função com valor de tabela

Aplica-se a: Marque Sim Databricks SQL Marque Sim Databricks Runtime 13.3 LTS e superior

Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.

Pode ler dados de um ou mais tópicos de Kafka. Ele suporta consultas em lote e ingestão de streaming.

Sintaxe

read_kafka([option_key => option_value ] [, ...])

Argumentos

Esta função requer invocação de parâmetro nomeado.

  • option_key: O nome da opção a ser configurada. Você deve usar backticks (') para opções que contêm pontos (.).
  • option_value: Uma expressão constante para definir a opção. Aceita literais e funções escalares.

Devoluções

Registros lidos de um cluster Apache Kafka com o seguinte esquema:

  • key BINARY: A chave do disco de Kafka.
  • value BINARY NOT NULL: O valor do registro de Kafka.
  • topic STRING NOT NULL: O nome do tópico Kafka do qual o registro é lido.
  • partition INT NOT NULL: O ID da partição Kafka a partir da qual o registro é lido.
  • offset BIGINT NOT NULL: O número de deslocamento do registro no Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Um valor de carimbo de data/hora para o registro. A timestampType coluna define a que corresponde esse carimbo de data/hora.
  • timestampType INTEGER NOT NULL: O tipo do carimbo de data/hora especificado na timestamp coluna.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Valores de cabeçalho fornecidos como parte do registro (se habilitado).

Exemplos

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opções

Você pode encontrar uma lista detalhada de opções na documentação do Apache Spark.

Opções necessárias

Forneça a opção abaixo para se conectar ao seu cluster Kafka.

Opção
bootstrapServers

Tipo: String

Uma lista separada por vírgulas de pares host/porta apontando para o cluster Kafka.

Valor padrão: Nenhum

Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka extrair dados.

Opção
assign

Tipo: String

Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}', as partições 0'th e 1st do topicA serão consumidas a partir de.

Valor padrão: Nenhum
subscribe

Tipo: String

Uma lista separada por vírgulas de tópicos de Kafka para ler.

Valor padrão: Nenhum
subscribePattern

Tipo: String

Uma expressão regular que corresponde aos tópicos para assinar.

Valor padrão: Nenhum

Opções diversas

read_kafka pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.

Opção
endingOffsets

Tipo: Tipo de consulta: String apenas lote

Os deslocamentos a serem lidos até para uma consulta em lotes, seja "latest" para especificar os registros mais recentes ou uma cadeia de caracteres JSON especificando um deslocamento final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente. -2 (mais cedo) como um deslocamento não é permitido.

Valor predefinido: "latest"
endingOffsetsByTimestamp

Tipo: Tipo de consulta: String apenas lote

Uma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo
1686444353000. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora.
endingOffsetsByTimestamp tem precedência sobre endingOffsets.

Valor padrão: Nenhum
endingTimestamp

Tipo: Tipo de consulta: String apenas lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Se Kafka não retornar a compensação correspondente, a compensação será definida como a mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp e
endingOffsets.

Valor padrão: Nenhum
includeHeaders

Tipo: Tipo de consulta: Boolean streaming e lote

Se os cabeçalhos de Kafka devem ser incluídos na linha.

Valor predefinido: false
kafka.<consumer_option>

Tipo: Tipo de consulta: String streaming e lote

Qualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você receberá um erro de analisador. Você pode encontrar as opções na documentação de Kafka.

Nota: Não deve definir as seguintes opções com esta função:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Valor padrão: Nenhum
maxOffsetsPerTrigger

Tipo: Long Tipo de consulta: apenas streaming

Limite de taxa para o número máximo de deslocamentos ou linhas processadas por intervalo de gatilho. O número total especificado de deslocamentos será dividido proporcionalmente em TopicPartitions.

Valor padrão: Nenhum
startingOffsets

Tipo: Tipo de consulta: String streaming e lote

O ponto inicial quando uma consulta é iniciada, que "earliest" é dos primeiros deslocamentos, "latest" que é apenas dos deslocamentos mais recentes, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um offset pode ser usado para se referir ao mais antigo, -1 ao mais recente.

Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em JSON) não é permitido. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo.

Valor padrão: "latest" para streaming, "earliest" para lote
startingOffsetsByTimestamp

Tipo: Tipo de consulta: String streaming e lote

Uma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC, por exemplo 1686444353000, . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp tem precedência sobre startingOffsets.

Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo.

Valor padrão: Nenhum
startingOffsetsByTimestampStrategy

Tipo: Tipo de consulta: String streaming e lote

Essa estratégia é usada quando o deslocamento inicial especificado por carimbo de data/hora (global ou por partição) não corresponde ao deslocamento que Kafka retornou. As estratégias disponíveis são:

* "error": falhar na consulta
* "latest": atribui o deslocamento mais recente para essas partições para que o Spark possa ler registros mais recentes dessas partições em microlotes posteriores.

Valor predefinido: "error"
startingTimestamp

Tipo: Tipo de consulta: String streaming e lote

Um valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde
1970-01-01 00:00:00 UTC, por exemplo "1686444353000". Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy.
startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets.

Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas mais cedo.

Valor padrão: Nenhum

Nota

O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se Kafka não retornar o deslocamento correspondente - verifique a descrição de cada opção.

O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimeso , e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimeso , consulte a documentação. Além disso, o significado de carimbo de data/hora aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type). Para obter detalhes, consulte a documentação do Apache Kafka.