Aracılığıyla paylaş


read_kafka tablo değerli işlev

Şunlar için geçerlidir: onay işareti evet olarak işaretlenmiş Databricks SQL onay işareti evet olarak işaretlenmiş Databricks Runtime 13.3 LTS ve üzeri

Apache Kafka kümesindeki verileri okur ve tablo biçiminde döndürür.

Bir veya daha fazla Kafka konu başlığındaki verileri okuyabilir. Hem toplu sorgular hem de akış alımını destekler.

Söz dizimi

read_kafka([option_key => option_value ] [, ...])

Bağımsız değişkenler

Bu işlev adlandırılmış parametre çağırması gerektirir.

  • option_key: Yapılandırılan seçeneğin adı. Nokta () içeren seçenekler için backticks. (') kullanmanız gerekir.
  • option_value: Seçeneği ayarlamak için sabit bir ifade. Değişmez değerleri ve skaler işlevleri kabul eder.

Döndürülenler

Aşağıdaki şemaya sahip bir Apache Kafka kümesinden okunan kayıtlar:

  • key BINARY: Kafka kaydının anahtarı.
  • value BINARY NOT NULL: Kafka kaydının değeri.
  • topic STRING NOT NULL: Kaydın okunduğu Kafka konusunun adı.
  • partition INT NOT NULL: Kaydın okunduğu Kafka bölümünün kimliği.
  • offset BIGINT NOT NULL: Kafka'daki TopicPartitionkaydın uzaklık numarası.
  • timestamp TIMESTAMP NOT NULL: Kayıt için bir zaman damgası değeri. sütun, timestampType bu zaman damgasının neye karşılık olduğunu tanımlar.
  • timestampType INTEGER NOT NULL: Sütunda belirtilen zaman damgasının timestamp türü.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Kaydın parçası olarak sağlanan üst bilgi değerleri (etkinse).

Örnekler

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Seçenekler

Seçeneklerin ayrıntılı listesini Apache Spark belgelerinde bulabilirsiniz.

Gerekli seçenekler

Kafka kümenize bağlanmak için aşağıdaki seçeneği belirtin.

Seçenek
bootstrapServers

Tür: String

Kafka kümesine işaret eden konak/bağlantı noktası çiftlerinin virgülle ayrılmış listesi.

Varsayılan değer: Yok

Veri çekebileceğiniz Kafka konularını yapılandırmak için aşağıdaki seçeneklerden yalnızca birini belirtin.

Seçenek
assign

Tür: String

Kullanılacak belirli konu bölümlerini içeren bir JSON dizesi. Örneğin, için '{"topicA":[0,1],"topicB":[2,4]}'topicA'nın 0 ve 1. bölümleri kullanılacaktır.

Varsayılan değer: Yok
subscribe

Tür: String

Okunacak Kafka konularının virgülle ayrılmış listesi.

Varsayılan değer: Yok
subscribePattern

Tür: String

Abone olunacak konularla eşleşen normal ifade.

Varsayılan değer: Yok

Çeşitli seçenekler

read_kafka toplu sorgularda ve akış sorgularında kullanılabilir. Aşağıdaki seçenekler, hangi sorgu türüne uygulanacağını belirtir.

Seçenek
endingOffsets

Tür: String Sorgu Türü: yalnızca toplu iş

En son kayıtları belirtmek üzere bir toplu iş sorgusuna "latest" kadar okunacak uzaklıklar veya her TopicPartition için bitiş uzaklığını belirten bir JSON dizesi. JSON'da, -1 en son sürüme başvurmak için uzaklık kullanılabilir. -2 uzaklık olarak (en erken) izin verilmez.

Varsayılan değer: "latest"
endingOffsetsByTimestamp

Tür: String Sorgu Türü: yalnızca toplu iş

Her TopicPartition için okunacak bitiş zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, örneğin tarihinden itibaren 1970-01-01 00:00:00 UTCmilisaniye cinsinden uzun bir zaman damgası değeri olarak sağlanması gerekir.
1686444353000. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın.
endingOffsetsByTimestamp ' den önceliklidir endingOffsets.

Varsayılan değer: Yok
endingTimestamp

Tür: String Sorgu Türü: yalnızca toplu iş

Bu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri
1970-01-01 00:00:00 UTC, örneğin "1686444353000". Kafka eşleşen uzaklığı döndürmezse, uzaklık en son olarak ayarlanır. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Not: endingTimestamp ve'den önceliklidir endingOffsetsByTimestamp
endingOffsets.

Varsayılan değer: Yok
includeHeaders

Tür: Boolean Sorgu Türü: akış ve toplu iş

Kafka üst bilgilerinin satıra eklenip eklenmeyeceği.

Varsayılan değer: false
kafka.<consumer_option>

Tür: String Sorgu Türü: akış ve toplu iş

Kafka tüketicisine özgü seçenekler ön ek ile geçirilebilir kafka. . Bu seçeneklerin sağlandığında arka uçlarla çevrelenmiş olması gerekir, aksi takdirde ayrıştırıcı hatası alırsınız. Seçenekleri Kafka belgelerinde bulabilirsiniz.

Not: Bu işlevle aşağıdaki seçenekleri ayarlamamalısınız:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Varsayılan değer: Yok
maxOffsetsPerTrigger

Tür: Long Sorgu Türü: yalnızca akış

Tetikleyici aralığı başına işlenen en fazla uzaklık veya satır sayısı için hız sınırı. Belirtilen toplam uzaklık sayısı, TopicPartitions arasında orantılı olarak bölünecek.

Varsayılan değer: Yok
startingOffsets

Tür: String Sorgu Türü: akış ve toplu iş

Yalnızca en son uzaklıklardan gelen en eski uzaklıklardan "latest" veya her TopicPartition için başlangıç uzaklığını belirten bir JSON dizesinden oluşan sorgunun başlatıldığı "earliest" başlangıç noktası. JSON'da, -2 en erkene ve -1 en son sürüme başvurmak için bir uzaklık kullanılabilir.

Not: Toplu sorgular için en son (örtük olarak veya JSON'da -1 kullanılarak) izin verilmez. Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar.

Varsayılan değer: "latest" akış için, "earliest" toplu iş için
startingOffsetsByTimestamp

Tür: String Sorgu Türü: akış ve toplu iş

Her TopicPartition için başlangıç zaman damgasını belirten bir JSON dizesi. Zaman damgalarının, örneğin 1686444353000tarihinden itibaren 1970-01-01 00:00:00 UTCmilisaniye cinsinden uzun bir zaman damgası değeri olarak sağlanması gerekir. Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış seçeneğinin startingOffsetsByTimestampStrategydeğerini izler.
startingOffsetsByTimestamp ' den önceliklidir startingOffsets.

Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar.

Varsayılan değer: Yok
startingOffsetsByTimestampStrategy

Tür: String Sorgu Türü: akış ve toplu iş

Bu strateji, zaman damgasına göre belirtilen başlangıç uzaklığı (genel veya bölüm başına) kafka'nın döndürülen uzaklığıyla eşleşmediğinde kullanılır. Kullanılabilir stratejiler şunlardır:

* "error": sorguyu başarısızlığa uğratma
* "latest": Spark'ın daha sonraki mikro toplu işlemlerde bu bölümlerden daha yeni kayıtları okuyabilmesi için bu bölümler için en son uzaklığı atar.

Varsayılan değer: "error"
startingTimestamp

Tür: String Sorgu Türü: akış ve toplu iş

Bu tarihten bu yana milisaniye cinsinden zaman damgasının dize değeri
1970-01-01 00:00:00 UTC, örneğin "1686444353000". Zaman damgalarıyla ilgili davranış ayrıntılarıyla ilgili aşağıdaki nota bakın. Kafka eşleşen uzaklığı döndürmezse, davranış seçeneğinin startingOffsetsByTimestampStrategydeğerini izler.
startingTimestampve startingOffsetsüzerinde startingOffsetsByTimestamp önceliklidir.

Not: Akış sorguları için bu yalnızca yeni bir sorgu başlatıldığında geçerlidir. Yeniden başlatılan akış sorguları, sorgu denetim noktasında tanımlanan uzaklıklardan devam eder. Sorgu sırasında yeni bulunan bölümler en erken başlar.

Varsayılan değer: Yok

Not

Her bölüm için döndürülen uzaklık, zaman damgası ilgili bölümde verilen zaman damgasından büyük veya buna eşit olan en erken uzaklıktır. Kafka eşleşen uzaklığı döndürmezse davranış seçenekler arasında değişir. Her seçeneğin açıklamasını denetleyin.

Spark yalnızca zaman damgası bilgilerini öğesine KafkaConsumer.offsetsForTimesgeçirir ve değeri yorumlamaz veya neden göstermez. hakkında KafkaConsumer.offsetsForTimesdaha fazla bilgi için lütfen belgelere bakın. Ayrıca, burada zaman damgasının anlamı Kafka yapılandırmasına (log.message.timestamp.type) göre değişebilir. Ayrıntılar için Apache Kafka belgelerine bakın.