Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Şunlar için geçerlidir:
Databricks SQL
Databricks Runtime 13.3 LTS ve üzeri
Apache Kafka kümesindeki verileri okur ve tablo biçiminde döndürür.
Bir veya daha fazla Kafka konu başlığındaki verileri okuyabilir. Hem toplu sorguları hem de akış verisi işlemesini destekler.
Söz dizimi
read_kafka([option_key => option_value ] [, ...])
Tartışmalar
Bu işlev adlandırılmış parametre çağırması gerektirir.
-
option_key: Yapılandırılan seçeneğin adı. Backticks () for options that contain dots (.`) kullanmanız gerekir. -
option_value: Seçeneği ayarlamak için sabit bir ifade . Değişmez değerleri ve skalar fonksiyonları kabul eder.
İadeler
Aşağıdaki şemaya sahip bir Apache Kafka kümesinden okunan kayıtlar:
-
key BINARY: Kafka kaydının anahtarı. -
value BINARY NOT NULL: Kafka kaydının değeri. -
topic STRING NOT NULL: Kaydın okunduğu Kafka konusunun adı. -
partition INT NOT NULL: Kaydın okunduğu Kafka bölümünün kimliği. -
offset BIGINT NOT NULL: Kafka kaydınınTopicPartitionofset numarası. -
timestamp TIMESTAMP NOT NULL: Kayıt için bir zaman damgası değeri.timestampTypesütunu bu zaman damgasının neye karşılık olduğunu tanımlar. -
timestampType INTEGER NOT NULL:timestampsütununda belirtilen zaman damgasının türü. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Kayıtın bir parçası olarak sağlanan başlık (header) değerleri (etkin olduğunda).
Örnekler
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Seçenekler
Seçeneklerin ayrıntılı listesini Seçenekler'de bulabilirsiniz.
Gerekli seçenekler
Kafka kümenize bağlanmak için aşağıdaki seçeneği belirtin.
| Seçenek |
|---|
bootstrapServersTür: StringKafka kümesine işaret eden konak/bağlantı noktası çiftlerinin virgülle ayrılmış listesi. Varsayılan değer: Yok |
Veri çekebileceğiniz Kafka konularını yapılandırmak için aşağıdaki seçeneklerden yalnızca birini belirtin.
| Seçenek |
|---|
assignTür: StringKullanılacak belirli konu bölümlerini içeren bir JSON dizesi. Örneğin, '{"topicA":[0,1],"topicB":[2,4]}' için topicA'nın 0'ıncı ve 1'inci bölümlerinden tüketilecektir.Varsayılan değer: Yok |
subscribeTür: StringOkunacak Kafka konularının virgülle ayrılmış listesi. Varsayılan değer: Yok |
subscribePatternTür: StringAbone olunacak konuları eşleştiren düzenli ifade. Varsayılan değer: Yok |
Çeşitli seçenekler
read_kafka toplu sorgularda ve akış sorgularında kullanılabilir. Aşağıdaki seçenekler, hangi sorgu türüne uygulanacağını belirtir.
| Seçenek |
|---|
endingOffsetsTür: String Sorgu tipi: sadece topluBir toplu sorguda okunacak konumlar, en son kayıtları belirtmek için "latest" veya her bir TopicPartition için bitiş konumunu belirten bir JSON dizesidir. JSON'da, en son durumu belirtmek için bir ofset olarak -1 kullanılabilir. "Ofset olarak -2 (en erken) izin verilmiyor."Varsayılan değer: "latest" |
endingOffsetsByTimestampTür: String Sorgu tipi: sadece topluHer TopicPartition için okunacak bitiş zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, 1970-01-01 00:00:00 UTC tarihinden itibaren milisaniye cinsinden uzun bir değer olarak sağlanması gerekir, örneğin.1686444353000. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın.endingOffsetsByTimestamp ' den önceliklidir endingOffsets.Varsayılan değer: Yok |
endingTimestampTür: String Sorgu tipi: sadece topluBu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri 1970-01-01 00:00:00 UTC, örneğin "1686444353000". Kafka eşleşen uzaklığı döndürmezse, uzaklık en son olarak ayarlanır. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Not: endingTimestamp, endingOffsetsByTimestamp'e göre önceliklidir.endingOffsets.Varsayılan değer: Yok |
includeHeadersTür: Boolean Sorgu Türü: akış ve yığın işlemeKafka üst bilgilerinin satıra eklenip eklenmeyeceği. Varsayılan değer: false |
kafka.<consumer_option>Tür: String Sorgu Türü: akış ve yığın işlemeKafka tüketicisine özgü herhangi bir seçenek, kafka. öneki ile birlikte geçirilebilir. Bu seçenekler verildiğinde ters tırnak işaretleriyle çevrelenmelidir, aksi takdirde ayrıştırıcı hatası alırsınız. Seçenekleri Kafka belgelerinde bulabilirsiniz.Not: Bu işlevle aşağıdaki seçenekleri ayarlamamalısınız: key.deserializer, value.deserializer, bootstrap.servers, group.idVarsayılan değer: Yok |
maxOffsetsPerTriggerTür: Long Sorgu Türü: yalnızca akışTetikleme aralığı başına işlenecek maksimum offset veya satır sayısı için hız sınırı. Belirtilen toplam offset sayısı, TopicPartitions arasında orantılı olarak bölünecek. Varsayılan değer: Yok |
startingOffsetsTür: String Sorgu Türü: akış ve yığın işlemeSorgunun başlatıldığı başlangıç noktası, en erken uzaklıklardan gelen "earliest", yalnızca en son uzaklıklardan gelen "latest" veya her TopicPartition için başlangıç uzaklığını belirten bir JSON dizesidir. JSON'da, başlangıç noktası olarak -2 en erken zamana, -1 en son zamana başvurmak için kullanılabilir.Not: Toplu sorgular için en son (örtük olarak veya JSON'da -1 kullanılarak) izin verilmez. Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar. Varsayılan değer: "latest" streaming için, "earliest" batch için |
startingOffsetsByTimestampTür: String Sorgu Türü: akış ve yığın işlemeHer TopicPartition için başlangıç zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, 1970-01-01 00:00:00 UTC tarihinden itibaren, örneğin 1686444353000, milisaniye cinsinden uzun bir değer olarak sağlanması gerekir. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış startingOffsetsByTimestampStrategy seçeneğinin değerine bağlı olacak.startingOffsetsByTimestamp ' den önceliklidir startingOffsets.Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar. Varsayılan değer: Yok |
startingOffsetsByTimestampStrategyTür: String Sorgu Türü: akış ve yığın işlemeBu strateji, zaman damgasına göre belirtilen başlangıç uzaklığı (genel veya bölüm başına) Kafka'nın döndürdüğü uzaklıkla eşleşmediğinde kullanılır. Kullanılabilir stratejiler şunlardır:
Varsayılan değer: "error" |
startingTimestampTür: String Sorgu Türü: akış ve yığın işlemeBu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri 1970-01-01 00:00:00 UTC, örneğin "1686444353000". Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış startingOffsetsByTimestampStrategy seçeneğinin değerine bağlı olacak.startingTimestampve startingOffsetsByTimestampüzerinde startingOffsets önceliklidir.Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Bir sorgu sırasında yeni keşfedilen bölümler daima en erken başlar. Varsayılan değer: Yok |
Not
Her bölüm için döndürülen uzaklık, zaman damgası ilgili bölümde verilen zaman damgasından büyük veya buna eşit olan en erken uzaklıktır. Kafka eşleşen uzaklığı döndürmezse davranış seçenekler arasında değişir. Her seçeneğin açıklamasını denetleyin.
Spark, zaman damgası bilgilerini basitçe KafkaConsumer.offsetsForTimes öğesine iletir ve değeri yorumlamaz veya üzerinde düşünmez. hakkında KafkaConsumer.offsetsForTimesdaha fazla bilgi için lütfen belgelere bakın. Ayrıca, burada zaman damgasının anlamı Kafka yapılandırmasına (log.message.timestamp.type) göre değişebilir. Ayrıntılar için Apache Kafka belgelerine bakın.