Функция с табличным значением read_kafka

Область применения:проверка помечены да Databricks SQL проверка помечены да Databricks Runtime 13.3 LTS и выше

Считывает данные из кластера Apache Kafka и возвращает данные в табличной форме.

Может считывать данные из одной или нескольких тем Kafka. Он поддерживает как пакетные запросы, так и прием потоковой передачи.

Синтаксис

read_kafka([option_key => option_value ] [, ...])

Аргументы

Для этой функции требуется вызов именованного параметра.

  • option_key: имя параметра для настройки. Для параметров, содержащих точки (), необходимо использовать обратные знаки (.').
  • option_value: константное выражение для задания параметра. Принимает литералы и скалярные функции.

Возвраты

Записи считываются из кластера Apache Kafka со следующей схемой:

  • key BINARY: ключ записи Kafka.
  • value BINARY NOT NULL: значение записи Kafka.
  • topic STRING NOT NULL: имя раздела Kafka, из нее считывается запись.
  • partition INT NOT NULL: идентификатор секции Kafka, из нее считывается запись.
  • offset BIGINT NOT NULL: число смещения записи в Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: значение метки времени для записи. Столбец timestampType определяет, к чему соответствует эта метка времени.
  • timestampType INTEGER NOT NULL: тип метки времени, указанной в столбце timestamp .
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: значения заголовков, предоставленные в рамках записи (если включена).

Примеры

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Параметры

Подробный список параметров можно найти в документации Apache Spark.

Обязательные параметры

Укажите приведенный ниже вариант для подключения к кластеру Kafka.

Вариант
bootstrapServers

Тип: String

Список пар узлов и портов, разделенных запятыми, указывающих на кластер Kafka.

Значение по умолчанию: нет

Укажите только один из приведенных ниже вариантов, чтобы настроить, какие разделы Kafka будут извлекать данные из.

Вариант
assign

Тип: String

Строка JSON, содержащая определенные секции раздела для использования. Например, для '{"topicA":[0,1],"topicB":[2,4]}'раздела 0'й и 1-й секций будут использоваться разделы.

Значение по умолчанию: нет
subscribe

Тип: String

Разделенный запятыми список разделов Kafka для чтения.

Значение по умолчанию: нет
subscribePattern

Тип: String

Регулярное выражение, соответствующее темам для подписки.

Значение по умолчанию: нет

Прочие параметры

read_kafka можно использовать в пакетных запросах и в потоковых запросах. Приведенные ниже параметры указывают тип запроса, к которому они применяются.

Вариант
endingOffsets

Тип запроса: String только пакет

Смещения для чтения до пакетного запроса либо для указания последних записей, либо "latest" строки JSON, указывающей конечное смещение для каждого раздела. В формате JSON -1 можно использовать смещение для ссылки на последнюю версию. -2 (самое раннее) как смещение не допускается.

Значение по умолчанию: "latest"
endingOffsetsByTimestamp

Тип запроса: String только пакет

Строка JSON, указывающая конечную метку времени для чтения до каждого раздела. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC, например,
1686444353000. Дополнительные сведения о поведении с метками времени см . ниже .
endingOffsetsByTimestamp имеет больший приоритет, чем endingOffsets.

Значение по умолчанию: нет
endingTimestamp

Тип запроса: String только пакет

Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Если Kafka не возвращает соответствующее смещение, то для смещения будет задано значение latest. Дополнительные сведения о поведении с метками времени см . ниже . Примечание. endingTimestamp Имеет приоритет над endingOffsetsByTimestamp и
endingOffsets.

Значение по умолчанию: нет
includeHeaders

Тип запроса: Boolean потоковая передача и пакет

Следует ли включать заголовки Kafka в строку.

Значение по умолчанию: false
kafka.<consumer_option>

Тип запроса: String потоковая передача и пакет

Любые параметры для конкретного потребителя Kafka можно передать с kafka. префиксом. Эти параметры должны быть окружены обратными знаками при указании, в противном случае вы получите ошибку синтаксического анализа. Параметры можно найти в документации Kafka.

Примечание. Для этой функции не следует задавать следующие параметры:
key.deserializer, , value.deserializerbootstrap.serversgroup.id

Значение по умолчанию: нет
maxOffsetsPerTrigger

Тип запроса: Long только потоковая передача

Ограничение скорости на максимальное количество смещения или строк, обработанных на интервал триггера. Указанное общее количество смещения будет пропорционально разделено по разделам.

Значение по умолчанию: нет
startingOffsets

Тип запроса: String потоковая передача и пакет

Начальная точка при запуске запроса, "earliest" либо из самых ранних смещения, "latest" которая находится только из последних смещения, либо строка JSON, указывающая начальное смещение для каждого раздела. В формате JSON -2 смещение можно использовать для ссылки на самые ранние и -1 последние.

Примечание. Для пакетных запросов последние (неявно или с помощью -1 в JSON) запрещены. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться из смещений, определенных в проверка point запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Значение по умолчанию: "latest" для потоковой передачи "earliest" для пакетной службы
startingOffsetsByTimestamp

Тип запроса: String потоковая передача и пакет

Строка JSON, указывающая начальную метку времени для каждой темыPartition. Метки времени должны быть предоставлены в виде длинного значения метки времени в миллисекундах, так как 1970-01-01 00:00:00 UTC, например 1686444353000. Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующее смещение, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp имеет больший приоритет, чем startingOffsets.

Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться из смещений, определенных в проверка point запроса. Недавно обнаруженные секции во время запроса будут начинаться в самое ближайшее время.

Значение по умолчанию: нет
startingOffsetsByTimestampStrategy

Тип запроса: String потоковая передача и пакет

Эта стратегия используется, если указанная начальная смещение по метке времени (глобальной или каждой секции) не соответствует возвращаемой смещению Kafka. Доступные стратегии:

* "error": сбой запроса
* "latest": назначает последнее смещение для этих секций, чтобы Spark могли считывать новые записи из этих секций в последующих микропакетах.

Значение по умолчанию: "error"
startingTimestamp

Тип запроса: String потоковая передача и пакет

Строковое значение метки времени в миллисекундах с тех пор
1970-01-01 00:00:00 UTCнапример "1686444353000". Дополнительные сведения о поведении с метками времени см . ниже . Если Kafka не возвращает соответствующее смещение, поведение будет соответствовать значению параметра startingOffsetsByTimestampStrategy.
startingTimestamp имеет приоритет над startingOffsetsByTimestamp и startingOffsets.

Примечание. Для потоковых запросов это применяется только при запуске нового запроса. Перезапущенные запросы потоковой передачи будут продолжаться из смещений, определенных в проверка point запроса. Недавно обнаруженные секции во время запроса начнутся самым ранним.

Значение по умолчанию: нет

Примечание.

Возвращаемое смещение для каждой секции является самым ранним смещением, метка времени которого больше или равна заданной метке времени в соответствующей секции. Поведение зависит от параметров, если Kafka не возвращает соответствующее смещение - проверка описание каждого параметра.

Spark просто передает сведения о KafkaConsumer.offsetsForTimesметке времени и не интерпретирует или не определяет значение. Дополнительные сведения KafkaConsumer.offsetsForTimesсм. в документации. Кроме того, значение метки времени здесь может отличаться в зависимости от конфигурации Kafka (log.message.timestamp.type). Дополнительные сведения см. в документации по Apache Kafka.