read_kafka tablo değerli fonksiyon

Şunlar için geçerlidir:onay işareti evet olarak işaretlenmiş Databricks SQL onay işareti evet olarak işaretlenmiş Databricks Runtime 13.3 LTS ve üzeri

Apache Kafka kümesindeki verileri okur ve tablo biçiminde döndürür.

Bir veya daha fazla Kafka konu başlığındaki verileri okuyabilir. Hem toplu sorguları hem de akış verisi işlemesini destekler.

Söz dizimi

read_kafka([option_key => option_value ] [, ...])

Tartışmalar

Bu işlev adlandırılmış parametre çağırması gerektirir.

  • option_key: Yapılandırılan seçeneğin adı. Backticks () for options that contain dots (.`) kullanmanız gerekir.
  • option_value: Seçeneği ayarlamak için sabit bir ifade . Değişmez değerleri ve skalar fonksiyonları kabul eder.

İadeler

Aşağıdaki şemaya sahip bir Apache Kafka kümesinden okunan kayıtlar:

  • key BINARY: Kafka kaydının anahtarı.
  • value BINARY NOT NULL: Kafka kaydının değeri.
  • topic STRING NOT NULL: Kaydın okunduğu Kafka konusunun adı.
  • partition INT NOT NULL: Kaydın okunduğu Kafka bölümünün kimliği.
  • offset BIGINT NOT NULL: Kafka kaydının TopicPartition ofset numarası.
  • timestamp TIMESTAMP NOT NULL: Kayıt için bir zaman damgası değeri. timestampType sütunu bu zaman damgasının neye karşılık olduğunu tanımlar.
  • timestampType INTEGER NOT NULL: timestamp sütununda belirtilen zaman damgasının türü.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Kayıtın bir parçası olarak sağlanan başlık (header) değerleri (etkin olduğunda).

Örnekler

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Seçenekler

Seçeneklerin ayrıntılı listesini Seçenekler'de bulabilirsiniz.

Gerekli seçenekler

Kafka kümenize bağlanmak için aşağıdaki seçeneği belirtin.

Seçenek
bootstrapServers
Tür: String
Kafka kümesine işaret eden konak/bağlantı noktası çiftlerinin virgülle ayrılmış listesi.
Varsayılan değer: Yok

Veri çekebileceğiniz Kafka konularını yapılandırmak için aşağıdaki seçeneklerden yalnızca birini belirtin.

Seçenek
assign
Tür: String
Kullanılacak belirli konu bölümlerini içeren bir JSON dizesi. Örneğin, '{"topicA":[0,1],"topicB":[2,4]}' için topicA'nın 0'ıncı ve 1'inci bölümlerinden tüketilecektir.
Varsayılan değer: Yok
subscribe
Tür: String
Okunacak Kafka konularının virgülle ayrılmış listesi.
Varsayılan değer: Yok
subscribePattern
Tür: String
Abone olunacak konuları eşleştiren düzenli ifade.
Varsayılan değer: Yok

Çeşitli seçenekler

read_kafka toplu sorgularda ve akış sorgularında kullanılabilir. Aşağıdaki seçenekler, hangi sorgu türüne uygulanacağını belirtir.

Seçenek
endingOffsets
Tür: String Sorgu tipi: sadece toplu
Bir toplu sorguda okunacak konumlar, en son kayıtları belirtmek için "latest" veya her bir TopicPartition için bitiş konumunu belirten bir JSON dizesidir. JSON'da, en son durumu belirtmek için bir ofset olarak -1 kullanılabilir. "Ofset olarak -2 (en erken) izin verilmiyor."
Varsayılan değer: "latest"
endingOffsetsByTimestamp
Tür: String Sorgu tipi: sadece toplu
Her TopicPartition için okunacak bitiş zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, 1970-01-01 00:00:00 UTC tarihinden itibaren milisaniye cinsinden uzun bir değer olarak sağlanması gerekir, örneğin.
1686444353000. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın.
endingOffsetsByTimestamp ' den önceliklidir endingOffsets.
Varsayılan değer: Yok
endingTimestamp
Tür: String Sorgu tipi: sadece toplu
Bu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri
1970-01-01 00:00:00 UTC, örneğin "1686444353000". Kafka eşleşen uzaklığı döndürmezse, uzaklık en son olarak ayarlanır. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Not: endingTimestamp, endingOffsetsByTimestamp'e göre önceliklidir.
endingOffsets.
Varsayılan değer: Yok
includeHeaders
Tür: Boolean Sorgu Türü: akış ve yığın işleme
Kafka üst bilgilerinin satıra eklenip eklenmeyeceği.
Varsayılan değer: false
kafka.<consumer_option>
Tür: String Sorgu Türü: akış ve yığın işleme
Kafka tüketicisine özgü herhangi bir seçenek, kafka. öneki ile birlikte geçirilebilir. Bu seçenekler verildiğinde ters tırnak işaretleriyle çevrelenmelidir, aksi takdirde ayrıştırıcı hatası alırsınız. Seçenekleri Kafka belgelerinde bulabilirsiniz.
Not: Bu işlevle aşağıdaki seçenekleri ayarlamamalısınız:
key.deserializer, value.deserializer, bootstrap.servers, group.id
Varsayılan değer: Yok
maxOffsetsPerTrigger
Tür: Long Sorgu Türü: yalnızca akış
Tetikleme aralığı başına işlenecek maksimum offset veya satır sayısı için hız sınırı. Belirtilen toplam offset sayısı, TopicPartitions arasında orantılı olarak bölünecek.
Varsayılan değer: Yok
startingOffsets
Tür: String Sorgu Türü: akış ve yığın işleme
Sorgunun başlatıldığı başlangıç noktası, en erken uzaklıklardan gelen "earliest", yalnızca en son uzaklıklardan gelen "latest" veya her TopicPartition için başlangıç uzaklığını belirten bir JSON dizesidir. JSON'da, başlangıç noktası olarak -2 en erken zamana, -1 en son zamana başvurmak için kullanılabilir.
Not: Toplu sorgular için en son (örtük olarak veya JSON'da -1 kullanılarak) izin verilmez. Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar.
Varsayılan değer: "latest" streaming için, "earliest" batch için
startingOffsetsByTimestamp
Tür: String Sorgu Türü: akış ve yığın işleme
Her TopicPartition için başlangıç zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, 1970-01-01 00:00:00 UTC tarihinden itibaren, örneğin 1686444353000, milisaniye cinsinden uzun bir değer olarak sağlanması gerekir. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış startingOffsetsByTimestampStrategy seçeneğinin değerine bağlı olacak.
startingOffsetsByTimestamp ' den önceliklidir startingOffsets.
Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar.
Varsayılan değer: Yok
startingOffsetsByTimestampStrategy
Tür: String Sorgu Türü: akış ve yığın işleme
Bu strateji, zaman damgasına göre belirtilen başlangıç uzaklığı (genel veya bölüm başına) Kafka'nın döndürdüğü uzaklıkla eşleşmediğinde kullanılır. Kullanılabilir stratejiler şunlardır:
  • "error": sorgu başarısız
  • "latest": Spark'ın daha sonraki mikro toplu işlemlerde bu bölümlerden daha yeni kayıtları okuyabilmesi için bu bölümler için en son uzaklığı atar.

Varsayılan değer: "error"
startingTimestamp
Tür: String Sorgu Türü: akış ve yığın işleme
Bu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri
1970-01-01 00:00:00 UTC, örneğin "1686444353000". Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış startingOffsetsByTimestampStrategy seçeneğinin değerine bağlı olacak.
startingTimestampve startingOffsetsByTimestampüzerinde startingOffsets önceliklidir.
Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Bir sorgu sırasında yeni keşfedilen bölümler daima en erken başlar.
Varsayılan değer: Yok

Not

Her bölüm için döndürülen uzaklık, zaman damgası ilgili bölümde verilen zaman damgasından büyük veya buna eşit olan en erken uzaklıktır. Kafka eşleşen uzaklığı döndürmezse davranış seçenekler arasında değişir. Her seçeneğin açıklamasını denetleyin.

Spark, zaman damgası bilgilerini basitçe KafkaConsumer.offsetsForTimes öğesine iletir ve değeri yorumlamaz veya üzerinde düşünmez. hakkında KafkaConsumer.offsetsForTimesdaha fazla bilgi için lütfen belgelere bakın. Ayrıca, burada zaman damgasının anlamı Kafka yapılandırmasına (log.message.timestamp.type) göre değişebilir. Ayrıntılar için Apache Kafka belgelerine bakın.