read_kafka
테이블 반환 함수
적용 대상: Databricks SQL Databricks Runtime 13.3 LTS 이상
Apache Kafka 클러스터에서 데이터를 읽고 테이블 형식으로 데이터를 반환합니다.
하나 이상의 Kafka 토픽에서 데이터를 읽을 수 있습니다. 일괄 처리 쿼리와 스트리밍 수집을 모두 지원합니다.
구문
read_kafka([option_key => option_value ] [, ...])
인수
이 함수에는 명명된 매개 변수 호출이 필요합니다.
option_key
: 구성할 옵션의 이름입니다. 점()을 포함하는 옵션에는 백틱(.
')을 사용해야 합니다.option_value
: 옵션을 설정하는 상수 식입니다. 리터럴 및 스칼라 함수를 허용합니다.
반품
다음 스키마를 사용하여 Apache Kafka 클러스터에서 읽은 레코드:
key BINARY
: Kafka 레코드의 키입니다.value BINARY NOT NULL
: Kafka 레코드의 값입니다.topic STRING NOT NULL
: 레코드를 읽은 Kafka 토픽의 이름입니다.partition INT NOT NULL
: 레코드를 읽은 Kafka 파티션의 ID입니다.offset BIGINT NOT NULL
: KafkaTopicPartition
에 있는 레코드의 오프셋 번호입니다.timestamp TIMESTAMP NOT NULL
: 레코드의 타임스탬프 값입니다. 열은timestampType
이 타임스탬프가 해당하는 항목을 정의합니다.timestampType INTEGER NOT NULL
: 열에 지정된timestamp
타임스탬프의 형식입니다.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: 레코드의 일부로 제공되는 헤더 값입니다(사용하도록 설정된 경우).
예제
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
옵션
Apache Spark 설명서에서 자세한 옵션 목록을 찾을 수 있습니다.
필수 옵션
Kafka 클러스터에 연결하기 위한 아래 옵션을 제공합니다.
옵션 |
---|
bootstrapServers 유형: String Kafka 클러스터를 가리키는 호스트/포트 쌍의 쉼표로 구분된 목록입니다. 기본값: 없음 |
아래 옵션 중 하나만 제공하여 데이터를 가져올 Kafka 토픽을 구성합니다.
옵션 |
---|
assign 유형: String 사용할 특정 토픽 파티션을 포함하는 JSON 문자열입니다. 예를 들어 '{"topicA":[0,1],"topicB":[2,4]}' topicA의 0번째 및 첫 번째 파티션은 소비됩니다.기본값: 없음 |
subscribe 유형: String 읽을 Kafka 항목의 쉼표로 구분된 목록입니다. 기본값: 없음 |
subscribePattern 유형: String 구독할 정규식 일치 항목입니다. 기본값: 없음 |
기타 옵션
read_kafka
는 일괄 처리 쿼리 및 스트리밍 쿼리에서 사용할 수 있습니다. 아래 옵션은 적용할 쿼리 유형을 지정합니다.
옵션 |
---|
endingOffsets 형식: String 쿼리 형식: 일괄 처리만일괄 처리 쿼리 "latest" 를 위해 최신 레코드를 지정하거나 각 TopicPartition에 대한 끝 오프셋을 지정하는 JSON 문자열까지 읽을 오프셋입니다. JSON -1 에서 오프셋으로 최신 항목을 참조하는 데 사용할 수 있습니다. -2 (가장 이른) 오프셋은 허용되지 않습니다.기본값: "latest" |
endingOffsetsByTimestamp 형식: String 쿼리 형식: 일괄 처리만각 TopicPartition에 대해 읽을 끝 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예: 밀리초)으로 제공되어야 합니다. 1970-01-01 00:00:00 UTC 1686444353000 . 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요.endingOffsetsByTimestamp 는 .보다 endingOffsets 우선합니다.기본값: 없음 |
endingTimestamp 형식: String 쿼리 형식: 일괄 처리만이후 타임스탬프의 문자열 값(밀리초) 1970-01-01 00:00:00 UTC 예를 들면 다음과 같습니다 "1686444353000" . Kafka가 일치하는 오프셋을 반환하지 않으면 오프셋이 최신으로 설정됩니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. 참고: endingTimestamp 우선 순위 endingOffsetsByTimestamp 및endingOffsets .기본값: 없음 |
includeHeaders 형식: Boolean 쿼리 형식: 스트리밍 및 일괄 처리행에 Kafka 헤더를 포함할지 여부입니다. 기본값: false |
kafka.<consumer_option> 형식: String 쿼리 형식: 스트리밍 및 일괄 처리모든 Kafka 소비자별 옵션은 접두사를 사용하여 kafka. 전달할 수 있습니다. 이러한 옵션은 제공될 때 백틱으로 묶어야 합니다. 그렇지 않으면 파서 오류가 발생합니다. Kafka 설명서에서 옵션을 찾을 수 있습니다.참고: 이 함수를 사용하여 다음 옵션을 설정하면 안 됩니다. key.deserializer , value.deserializer , bootstrap.servers group.id 기본값: 없음 |
maxOffsetsPerTrigger 형식: Long 쿼리 형식: 스트리밍만트리거 간격당 처리되는 최대 오프셋 수 또는 행 수에 대한 속도 제한입니다. 지정된 총 오프셋 수는 TopicPartitions 간에 비례적으로 분할됩니다. 기본값: 없음 |
startingOffsets 형식: String 쿼리 형식: 스트리밍 및 일괄 처리쿼리가 시작될 때의 시작점이며, "earliest" 이는 가장 빠른 오프셋에서 온 것이며, "latest" 이는 최신 오프셋에서만 발생하거나 각 TopicPartition에 대한 시작 오프셋을 지정하는 JSON 문자열입니다. JSON -2 에서는 오프셋으로 가장 -1 빠른 항목을 최신으로 참조하는 데 사용할 수 있습니다.참고: 일괄 처리 쿼리의 경우 최신(암시적으로 또는 JSON에서 -1 사용)은 허용되지 않습니다. 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다. 기본값: "latest" 스트리밍의 경우, "earliest" 일괄 처리의 경우 |
startingOffsetsByTimestamp 형식: String 쿼리 형식: 스트리밍 및 일괄 처리각 TopicPartition에 대한 시작 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예 1686444353000 : 밀리초1970-01-01 00:00:00 UTC )으로 제공되어야 합니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy 의 값을 따릅니다.startingOffsetsByTimestamp 는 .보다 startingOffsets 우선합니다.참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다. 기본값: 없음 |
startingOffsetsByTimestampStrategy 형식: String 쿼리 형식: 스트리밍 및 일괄 처리이 전략은 타임스탬프별 지정된 시작 오프셋(전역 또는 파티션당)이 반환된 Kafka 오프셋과 일치하지 않는 경우에 사용됩니다. 사용 가능한 전략은 다음과 같습니다. - "error" : 쿼리 실패- "latest" : Spark가 이러한 파티션의 최신 레코드를 나중에 마이크로 일괄 처리로 읽을 수 있도록 이러한 파티션에 대한 최신 오프셋을 할당합니다.기본값: "error" |
startingTimestamp 형식: String 쿼리 형식: 스트리밍 및 일괄 처리이후 타임스탬프의 문자열 값(밀리초) 1970-01-01 00:00:00 UTC 예를 들면 다음과 같습니다 "1686444353000" . 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy 의 값을 따릅니다.startingTimestamp 는 우선 startingOffsetsByTimestamp 순위를 하며 .startingOffsets 참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사점에서 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션이 가장 일찍 시작됩니다. 기본값: 없음 |
참고 항목
각 파티션에 대해 반환된 오프셋은 타임스탬프가 해당 파티션의 지정된 타임스탬프보다 크거나 같은 가장 빠른 오프셋입니다. Kafka가 일치하는 오프셋을 반환하지 않는 경우 동작은 옵션에 따라 다릅니다. 각 옵션에 대한 설명을 확인합니다.
Spark는 단순히 타임스탬프 정보를 KafkaConsumer.offsetsForTimes
전달하며 값에 대한 해석이나 이유를 전달하지 않습니다. 자세한 내용은 KafkaConsumer.offsetsForTimes
설명서를 참조하세요. 또한 여기서 타임스탬프의 의미는 Kafka 구성(log.message.timestamp.type
)에 따라 달라질 수 있습니다. 자세한 내용은 Apache Kafka 설명서를 참조 하세요.