Sıkça Sorulan Sorular

Kafka'nın Azure Databricks ile kullanılması hakkında sık sorulan sorular.

Kafka seçeneğinin desteklenmediği veya tanınmadığı hatasını neden alıyorum?

Kafka yerel yapılandırma seçeneklerini ayarlarken ön eki unutmak kafka. yaygın bir hatadır. Doğrudan Kafka istemcisine geçirilen tüm seçeneklerin ön eki kafka.olmalıdır:

# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")

# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

Spark Kafka bağlayıcısına özgü seçenekler (, , subscribegibistartingOffsetsmaxOffsetsPerTrigger) ön ek gerekmez. Tam liste seçeneklerine bakın.

Gölgeli Kafka sınıfları hakkında neden hata alıyorum?

Azure Databricks gölgeli Kafka sınıfları (kafkashaded. veya shadedmskiam. ön ekli) kullanılmasını gerektirir. gibi RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDhatalar görürseniz, gölgeli sınıf adlarını kullanmanız gerekir:

  • org.apache.kafka.* sınıflar kafkashaded. ön eki gerektirir. Örneğin: kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
  • software.amazon.msk.* sınıflar shadedmskiam. ön eki gerektirir. Örneğin: shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule

Kafka'ya bağlanırken neden bir TimeoutException alıyorum?

Yaygın nedenler şunlardır:

  • Ağ bağlantısı: İşlem kümesi Kafka aracılarına erişemiyor. Güvenlik duvarı kurallarını, güvenlik gruplarını ve VPC yapılandırmalarını denetleyin.
  • Yanlış önyükleme sunucuları: Sunucu adı ve bağlantı noktasını kafka.bootstrap.servers kontrol edin.
  • DNS çözümleme: Kafka aracısı ana bilgisayar adlarının Azure Databricks ağından çözümlenebildiğinden emin olun.
  • SSL/TLS sorunları: SSL kullanıyorsanız sertifikaların doğru yapılandırıldığını doğrulayın.

Private Link veya VPC eşleme kurulumları için doğru ağ yollarının olduğundan emin olun.

Kafka için toplu iş mi yoksa akış modu mu kullanmalıyım?

Kullanım örneğine bağlıdır:

  • Akış modu (spark.readStream): Sürekli veri işlemeye veya düşük gecikme süreli alıma ihtiyacınız olduğunda kullanın.
  • Toplu veri modu (spark.read): Tek seferlik veri yüklemeleri, geriye dönük veri doldurma veya hata ayıklama amaçları için kullanın. Her ikisi de gerektirir: startingOffsets ve endingOffsets.

Bkz. Yapılandırılmış Akış tetikleyici aralıklarını yapılandırma ve , , Gerçek zamanlı mod gibi tetikleyici aralıklarını yapılandırma ayrıntıları için.

Tek bir akışta birden çok Kafka konu başlığından okuyabilir miyim?

Evet, şunları kullanabilirsiniz:

  • subscribe: Virgülle ayrılmış bir konu listesi sağlayın, örneğin .option("subscribe", "topic1,topic2").
  • subscribePattern: Konu adlarını eşleştirmek için Java bir regex deseni kullanın, örneğin .option("subscribePattern", "topic-.*").

Kafka'ı Lakeflow Spark Bildirimli İşlem Hatları ile nasıl kullanabilirim?

Lakeflow Spark Bildirimli İşlem Hatları Kafka kaynakları için yerel destek sağlar. Kafka'dan okuyan bir akış tablosu tanımlayabilirsiniz:

Python

import dlt

@dlt.table
def kafka_bronze():
  return (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:port>")
    .option("subscribe", "<topic>")
    .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>'
);

Lakeflow Spark Bildirimli İşlem Hatlarında akış kaynakları hakkında daha fazla bilgi için bkz. İşlem hatlarına veri yükleme .

Kafka anahtarı ve değer sütunları nasıl deserilize edilir?

key ve value sütunları ikili (BINARYtür) olarak döndürülür. Veri biçiminize göre seri durumdan çıkarmak için DataFrame işlemlerini kullanın:

Neden idempotent yazma hatası alıyorum?

Databricks Runtime 13.3 LTS ve üstü, kafka-clients kütüphanesinin varsayılan olarak idempotent yazmaları etkinleştiren daha yeni bir sürümünü içerir. Kafka kümeniz yapılandırılmış ancak etkinleştirilmemiş IDEMPOTENT_WRITE ACL'lerle 2.8.0 veya üzeri bir sürüm kullanıyorsa, yazma işlemi şu şekilde başarısız olur: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.

Kafka sürümünü 2.8.0 veya daha üstüne yükselterek veya Yapılandırılmış Akış yazıcınızı yapılandırırken .option("kafka.enable.idempotence", "false") ayarı yaparak bu hatayı düzeltin.

KAFKA_DATA_LOSS_ERROR Nedir ve bunu nasıl çözebilirim?

Bu hata, Kafka kaynağı denetim noktasında depolanan ofsetlerin Kafka'da artık kullanılamadığını algıladığında oluşur, çünkü genellikle:

  • Veri akışı, Kafka saklama süresinden daha uzun süreli duraklatıldı.
  • Kafka konu verileri silindi veya konu yeniden oluşturuldu.
  • Kafka aracısı veri kaybıyla karşılaşmıştır.

Bunu çözmek için:

  • Veri kaybı kabul edilebilirse: Akışın en erken kullanılabilir uzaklıktan devam etmesini sağlamak için .option("failOnDataLoss", "false") ayarlayın.
  • Veri kaybı kabul edilebilir değilse: Denetim noktasını sıfırlayın ve uzaklıklardan earliest yeniden işleyip eksik Kafka verilerini geri yükleyin.

Daha fazla bilgi için bkz. KAFKA_DATA_LOSS hata koşulu .

Kafka'dan verilerin okunma hızını nasıl denetleyebilirim?

Mikro toplu iş başına işlenen sapma (yaklaşık kayıt) sayısını sınırlamak için maxOffsetsPerTrigger seçeneğini kullanın. Bu, sonraki işlemleri bunaltabilecek veya bekleyen işleri telafi ederken bellek sorunlarına neden olabilecek büyük toplu işlemleri önlemeye yardımcı olur.

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>',
  maxOffsetsPerTrigger => '10000'
);

Alternatif olarak, her toplu işlem için kaç Spark bölümü oluşturulduğunu denetlemek için veya minPartitions gibi maxRecordsPerPartition seçenekleri kullanın.

En son Kafka uzaklıklarından akışımın ne kadar gerisinde olduğunu nasıl izleyebilirim?

Akış sorgusu ilerlemesinde bulunan avgOffsetsBehindLatest, maxOffsetsBehindLatest ve minOffsetsBehindLatest ölçümlerini kullanın. Abone olunan tüm konu bölümleri üzerinden akışınız, ne kadar sapmanın en son kullanılabilir sapmanın ardında olduğunu bildirir. Bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.

Ayrıca, henüz işlenmemiş toplam veri baytlarını tahmin etmek için de kullanabilirsiniz estimatedTotalBytesBehindLatest .

Databricks Runtime 17.1'e yükselttikten sonra Kafka uzaklık gecikmesi ölçümlerim neden kalıcı sıfır olmayan değerler gösteriyor?

Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.

Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .

Değerler büyükse veya sürekli büyüyorsa, akış gelen verilere ayak uydurmuyor olabilir. Bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.

Kafka akışı başlatmam neden yavaş?

Kafka akışları için şunlar için zaman gerekir:

  1. Kafka kümesine bağlanın ve meta verileri getirin.
  2. Konu bölümlerini keşfedin.
  3. İlk uzaklıkları getirme.

Şirket içi veya uzak Kafka kümeleri için ağ gecikme süresi başlatma süresini önemli ölçüde etkileyebilir. Tetiklenen/zamanlanmış işlem hatlarını sık sık yeniden başlatmalarla çalıştırıyorsanız, yinelenen başlatma ek yükünü önlemek için sürekli akış modunu kullanmayı göz önünde bulundurun.

Neden daha fazla Spark yürütücüsü eklemek Kafka aktarım hızımı artırmıyor?

Kafka aracıları doygunluğa ulaştığında, daha fazla Spark yürütücüsü eklemek, aktarım hızını artırmadan maliyeti artırır.

Kafka’nın darboğaz olduğunu gösteren işaretler:

  • Daha fazla çekirdek eklemeye rağmen işlem hacmi sabitleşir.
  • Kafka aracısı CPU veya ağ kullanımı yüksektir.
  • Spark görevleri hızla tamamlar ancak yeni verileri bekler.

Bu sorunu çözmek için aracı ekleyerek veya yükü dağıtmak için bölüm sayısını artırarak Kafka kümenizi ölçeklendirin.

Kafka akışı için maliyet ve işlem kullanımını nasıl iyileştirebilirim?

Mikro toplu iş ve AvailableNow modları için:

  • Kümenizi doğru boyutlandırın: Ölçümleri izleyin ve en yoğun yük için uygun bir sabit küme boyutu ayarlayın.
  • Yükleme ani artışları sırasında kaynak kullanımını denetlemek için : Toplu iş boyutlarını sınırla seçeneğini kullanınmaxOffsetsPerTrigger.
  • Otomatik ölçeklendirmeden kaçının: Akış işleri sürekli çalışır ve düğümleri eklemek veya kaldırmak, görevin yeniden dengeleme ek yüküne neden olur.
  • Veri dengesizliği azaltma: Dengesiz bölümler, bazı görevlerin diğerlerinden önemli ölçüde daha fazla veri işlemesine neden olur ve bu da toplu işlemin tamamlanmasını yavaşlatan ve boşta kalan görevlerde işlem kaynaklarını boşa harcayan sapmalara neden olur. minPartitions Daha dengeli işleme için büyük Kafka bölümlerini daha küçük Spark bölümlerine bölme seçeneğini kullanın.

Gerçek zamanlı mod için doğru boyutlandırma özellikle önemlidir çünkü görevler verileri beklerken boşta kalabilir. Dikkat edilmesi gereken temel noktalar

  • maxPartitions Her görevin, birden çok Kafka bölümünü işlemesini sağlayarak ek yükü azaltacak şekilde ayarlayın.
  • spark.sql.shuffle.partitions öğesini karıştırma yoğunluğu yüksek işler için ayarlayın.

Gerçek zamanlı mod için küme boyutlandırma yönergeleri için bkz. İşlem boyutlandırma .

Konu başlığında veriler mevcut olmasına rağmen akışım neden kayıt döndürmedi?

Yaygın nedenler şunlardır:

  • Yanlış startingOffsets ayar: Varsayılan değer, yalnızca akış başladıktan sonra gelen yeni verileri okuyan değeridir latest. startingOffsets öğesini mevcut verileri okumak için earliest olarak ayarlayın.
  • Yanlış konu adı: Doğru konuya abone olduğunuzu doğrulayın.
  • Kimlik doğrulama sorunları: Akışınız başarıyla bağlanmış olabilir ancak konu başlığından okuma iznine sahip değil. Kafka ACL'lerinizi denetleyin.
  • Ofset süresi dolması: Akışınız uzun süre durdurulduysa ve kontrol noktasındaki ofsetlerin süresi dolmuşsa (Kafka saklama süresi tarafından silinmişse), kontrol noktasını sıfırlamanız veya ayarlamanız failOnDataLoss gerekebilir.