read_kafka
tablo değerli işlev
Şunlar için geçerlidir: Databricks SQL Databricks Runtime 13.3 LTS ve üzeri
Apache Kafka kümesindeki verileri okur ve tablo biçiminde döndürür.
Bir veya daha fazla Kafka konu başlığındaki verileri okuyabilir. Hem toplu sorgular hem de akış alımını destekler.
Söz dizimi
read_kafka([option_key => option_value ] [, ...])
Bağımsız değişkenler
Bu işlev adlandırılmış parametre çağırması gerektirir.
option_key
: Yapılandırılan seçeneğin adı. Nokta () içeren seçenekler için backticks.
(') kullanmanız gerekir.option_value
: Seçeneği ayarlamak için sabit bir ifade. Değişmez değerleri ve skaler işlevleri kabul eder.
Döndürülenler
Aşağıdaki şemaya sahip bir Apache Kafka kümesinden okunan kayıtlar:
key BINARY
: Kafka kaydının anahtarı.value BINARY NOT NULL
: Kafka kaydının değeri.topic STRING NOT NULL
: Kaydın okunduğu Kafka konusunun adı.partition INT NOT NULL
: Kaydın okunduğu Kafka bölümünün kimliği.offset BIGINT NOT NULL
: Kafka'dakiTopicPartition
kaydın uzaklık numarası.timestamp TIMESTAMP NOT NULL
: Kayıt için bir zaman damgası değeri. sütun,timestampType
bu zaman damgasının neye karşılık olduğunu tanımlar.timestampType INTEGER NOT NULL
: Sütunda belirtilen zaman damgasınıntimestamp
türü.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Kaydın parçası olarak sağlanan üst bilgi değerleri (etkinse).
Örnekler
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
);
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Seçenekler
Seçeneklerin ayrıntılı listesini Apache Spark belgelerinde bulabilirsiniz.
Gerekli seçenekler
Kafka kümenize bağlanmak için aşağıdaki seçeneği belirtin.
Seçenek |
---|
bootstrapServers Tür: String Kafka kümesine işaret eden konak/bağlantı noktası çiftlerinin virgülle ayrılmış listesi. Varsayılan değer: Yok |
Veri çekebileceğiniz Kafka konularını yapılandırmak için aşağıdaki seçeneklerden yalnızca birini belirtin.
Seçenek |
---|
assign Tür: String Kullanılacak belirli konu bölümlerini içeren bir JSON dizesi. Örneğin, için '{"topicA":[0,1],"topicB":[2,4]}' topicA'nın 0 ve 1. bölümleri kullanılacaktır.Varsayılan değer: Yok |
subscribe Tür: String Okunacak Kafka konularının virgülle ayrılmış listesi. Varsayılan değer: Yok |
subscribePattern Tür: String Abone olunacak konularla eşleşen normal ifade. Varsayılan değer: Yok |
Çeşitli seçenekler
read_kafka
toplu sorgularda ve akış sorgularında kullanılabilir. Aşağıdaki seçenekler, hangi sorgu türüne uygulanacağını belirtir.
Seçenek |
---|
endingOffsets Tür: String Sorgu Türü: yalnızca toplu işEn son kayıtları belirtmek üzere bir toplu iş sorgusuna "latest" kadar okunacak uzaklıklar veya her TopicPartition için bitiş uzaklığını belirten bir JSON dizesi. JSON'da, -1 en son sürüme başvurmak için uzaklık kullanılabilir. -2 uzaklık olarak (en erken) izin verilmez.Varsayılan değer: "latest" |
endingOffsetsByTimestamp Tür: String Sorgu Türü: yalnızca toplu işHer TopicPartition için okunacak bitiş zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, örneğin tarihinden itibaren 1970-01-01 00:00:00 UTC milisaniye cinsinden uzun bir zaman damgası değeri olarak sağlanması gerekir.1686444353000 . Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın.endingOffsetsByTimestamp ' den önceliklidir endingOffsets .Varsayılan değer: Yok |
endingTimestamp Tür: String Sorgu Türü: yalnızca toplu işBu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri 1970-01-01 00:00:00 UTC , örneğin "1686444353000" . Kafka eşleşen uzaklığı döndürmezse, uzaklık en son olarak ayarlanır. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Not: endingTimestamp ve'den önceliklidir endingOffsetsByTimestamp endingOffsets .Varsayılan değer: Yok |
includeHeaders Tür: Boolean Sorgu Türü: akış ve toplu işKafka üst bilgilerinin satıra eklenip eklenmeyeceği. Varsayılan değer: false |
kafka.<consumer_option> Tür: String Sorgu Türü: akış ve toplu işKafka tüketicisine özgü seçenekler ön ek ile geçirilebilir kafka. . Bu seçeneklerin sağlandığında arka uçlarla çevrelenmiş olması gerekir, aksi takdirde ayrıştırıcı hatası alırsınız. Seçenekleri Kafka belgelerinde bulabilirsiniz.Not: Bu işlevle aşağıdaki seçenekleri ayarlamamalısınız: key.deserializer , value.deserializer , bootstrap.servers , group.id Varsayılan değer: Yok |
maxOffsetsPerTrigger Tür: Long Sorgu Türü: yalnızca akışTetikleyici aralığı başına işlenen en fazla uzaklık veya satır sayısı için hız sınırı. Belirtilen toplam uzaklık sayısı, TopicPartitions arasında orantılı olarak bölünecek. Varsayılan değer: Yok |
startingOffsets Tür: String Sorgu Türü: akış ve toplu işYalnızca en son uzaklıklardan gelen en eski uzaklıklardan "latest" veya her TopicPartition için başlangıç uzaklığını belirten bir JSON dizesinden oluşan sorgunun başlatıldığı "earliest" başlangıç noktası. JSON'da, -2 en erkene ve -1 en son sürüme başvurmak için bir uzaklık kullanılabilir.Not: Toplu sorgular için en son (örtük olarak veya JSON'da -1 kullanılarak) izin verilmez. Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar. Varsayılan değer: "latest" akış için, "earliest" toplu iş için |
startingOffsetsByTimestamp Tür: String Sorgu Türü: akış ve toplu işHer TopicPartition için başlangıç zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, örneğin 1686444353000 tarihinden itibaren 1970-01-01 00:00:00 UTC milisaniye cinsinden uzun bir zaman damgası değeri olarak sağlanması gerekir. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış seçeneğinin startingOffsetsByTimestampStrategy değerini izler.startingOffsetsByTimestamp ' den önceliklidir startingOffsets .Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar. Varsayılan değer: Yok |
startingOffsetsByTimestampStrategy Tür: String Sorgu Türü: akış ve toplu işBu strateji, zaman damgasına göre belirtilen başlangıç uzaklığı (genel veya bölüm başına) kafka'nın döndürülen uzaklığıyla eşleşmediğinde kullanılır. Kullanılabilir stratejiler şunlardır: * "error" : sorguyu başarısızlığa uğratma* "latest" : Spark'ın daha sonraki mikro toplu işlemlerde bu bölümlerden daha yeni kayıtları okuyabilmesi için bu bölümler için en son uzaklığı atar.Varsayılan değer: "error" |
startingTimestamp Tür: String Sorgu Türü: akış ve toplu işBu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri 1970-01-01 00:00:00 UTC , örneğin "1686444353000" . Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış seçeneğinin startingOffsetsByTimestampStrategy değerini izler.startingTimestamp ve startingOffsets üzerinde startingOffsetsByTimestamp önceliklidir.Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar. Varsayılan değer: Yok |
Not
Her bölüm için döndürülen uzaklık, zaman damgası ilgili bölümde verilen zaman damgasından büyük veya buna eşit olan en erken uzaklıktır. Kafka eşleşen uzaklığı döndürmezse davranış seçenekler arasında değişir. Her seçeneğin açıklamasını denetleyin.
Spark yalnızca zaman damgası bilgilerini öğesine KafkaConsumer.offsetsForTimes
geçirir ve değeri yorumlamaz veya neden göstermez. hakkında KafkaConsumer.offsetsForTimes
daha fazla bilgi için lütfen belgelere bakın. Ayrıca, burada zaman damgasının anlamı Kafka yapılandırmasına (log.message.timestamp.type
) göre değişebilir. Ayrıntılar için Apache Kafka belgelerine bakın.
İlgili makaleler
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin