Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Kafka'nın Azure Databricks ile kullanılması hakkında sık sorulan sorular.
Kafka seçeneğinin desteklenmediği veya tanınmadığı hatasını neden alıyorum?
Kafka yerel yapılandırma seçeneklerini ayarlarken ön eki unutmak kafka. yaygın bir hatadır. Doğrudan Kafka istemcisine geçirilen tüm seçeneklerin ön eki kafka.olmalıdır:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
Spark Kafka bağlayıcısına özgü seçenekler (, , subscribegibistartingOffsetsmaxOffsetsPerTrigger) ön ek gerekmez.
Tam liste seçeneklerine bakın.
Gölgeli Kafka sınıfları hakkında neden hata alıyorum?
Azure Databricks gölgeli Kafka sınıfları (kafkashaded. veya shadedmskiam. ön ekli) kullanılmasını gerektirir. gibi RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCEDhatalar görürseniz, gölgeli sınıf adlarını kullanmanız gerekir:
-
org.apache.kafka.*sınıflarkafkashaded.ön eki gerektirir. Örneğin:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*sınıflarshadedmskiam.ön eki gerektirir. Örneğin:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
Kafka'ya bağlanırken neden bir TimeoutException alıyorum?
Yaygın nedenler şunlardır:
- Ağ bağlantısı: İşlem kümesi Kafka aracılarına erişemiyor. Güvenlik duvarı kurallarını, güvenlik gruplarını ve VPC yapılandırmalarını denetleyin.
-
Yanlış önyükleme sunucuları: Sunucu adı ve bağlantı noktasını
kafka.bootstrap.serverskontrol edin. - DNS çözümleme: Kafka aracısı ana bilgisayar adlarının Azure Databricks ağından çözümlenebildiğinden emin olun.
- SSL/TLS sorunları: SSL kullanıyorsanız sertifikaların doğru yapılandırıldığını doğrulayın.
Private Link veya VPC eşleme kurulumları için doğru ağ yollarının olduğundan emin olun.
Kafka için toplu iş mi yoksa akış modu mu kullanmalıyım?
Kullanım örneğine bağlıdır:
-
Akış modu (
spark.readStream): Sürekli veri işlemeye veya düşük gecikme süreli alıma ihtiyacınız olduğunda kullanın. -
Toplu veri modu (
spark.read): Tek seferlik veri yüklemeleri, geriye dönük veri doldurma veya hata ayıklama amaçları için kullanın. Her ikisi de gerektirir:startingOffsetsveendingOffsets.
Bkz.
Tek bir akışta birden çok Kafka konu başlığından okuyabilir miyim?
Evet, şunları kullanabilirsiniz:
-
subscribe: Virgülle ayrılmış bir konu listesi sağlayın, örneğin.option("subscribe", "topic1,topic2"). -
subscribePattern: Konu adlarını eşleştirmek için Java bir regex deseni kullanın, örneğin.option("subscribePattern", "topic-.*").
Kafka'ı Lakeflow Spark Bildirimli İşlem Hatları ile nasıl kullanabilirim?
Lakeflow Spark Bildirimli İşlem Hatları Kafka kaynakları için yerel destek sağlar. Kafka'dan okuyan bir akış tablosu tanımlayabilirsiniz:
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
Lakeflow Spark Bildirimli İşlem Hatlarında akış kaynakları hakkında daha fazla bilgi için bkz. İşlem hatlarına veri yükleme .
Kafka anahtarı ve değer sütunları nasıl deserilize edilir?
key ve value sütunları ikili (BINARYtür) olarak döndürülür. Veri biçiminize göre seri durumdan çıkarmak için DataFrame işlemlerini kullanın:
-
Dize verileri: İkiliyi dizeye dönüştürmek için kullanın
cast("string"). -
JSON verileri: Dizeye dönüştürmeden sonra kullanın
from_json(). bkz.from_jsonişlevi. -
Avro verileri: Avro ile kodlanmış verilerin seri durumdan çıkarılması için kullanın
from_avro(). Bkz Avro verilerini akış olarak okuma ve yazma. -
Protokol arabellekleri: Protobuf verilerini seri durumdan çıkarmak için kullanın
from_protobuf(). Bkz. Okuma ve yazma protokolü arabellekleri.
Neden idempotent yazma hatası alıyorum?
Databricks Runtime 13.3 LTS ve üstü, kafka-clients kütüphanesinin varsayılan olarak idempotent yazmaları etkinleştiren daha yeni bir sürümünü içerir. Kafka kümeniz yapılandırılmış ancak etkinleştirilmemiş IDEMPOTENT_WRITE ACL'lerle 2.8.0 veya üzeri bir sürüm kullanıyorsa, yazma işlemi şu şekilde başarısız olur: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state.
Kafka sürümünü 2.8.0 veya daha üstüne yükselterek veya Yapılandırılmış Akış yazıcınızı yapılandırırken .option("kafka.enable.idempotence", "false") ayarı yaparak bu hatayı düzeltin.
KAFKA_DATA_LOSS_ERROR Nedir ve bunu nasıl çözebilirim?
Bu hata, Kafka kaynağı denetim noktasında depolanan ofsetlerin Kafka'da artık kullanılamadığını algıladığında oluşur, çünkü genellikle:
- Veri akışı, Kafka saklama süresinden daha uzun süreli duraklatıldı.
- Kafka konu verileri silindi veya konu yeniden oluşturuldu.
- Kafka aracısı veri kaybıyla karşılaşmıştır.
Bunu çözmek için:
-
Veri kaybı kabul edilebilirse: Akışın en erken kullanılabilir uzaklıktan devam etmesini sağlamak için
.option("failOnDataLoss", "false")ayarlayın. -
Veri kaybı kabul edilebilir değilse: Denetim noktasını sıfırlayın ve uzaklıklardan
earliestyeniden işleyip eksik Kafka verilerini geri yükleyin.
Daha fazla bilgi için bkz. KAFKA_DATA_LOSS hata koşulu .
Kafka'dan verilerin okunma hızını nasıl denetleyebilirim?
Mikro toplu iş başına işlenen sapma (yaklaşık kayıt) sayısını sınırlamak için maxOffsetsPerTrigger seçeneğini kullanın. Bu, sonraki işlemleri bunaltabilecek veya bekleyen işleri telafi ederken bellek sorunlarına neden olabilecek büyük toplu işlemleri önlemeye yardımcı olur.
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
Alternatif olarak, her toplu işlem için kaç Spark bölümü oluşturulduğunu denetlemek için veya minPartitions gibi maxRecordsPerPartition seçenekleri kullanın.
En son Kafka uzaklıklarından akışımın ne kadar gerisinde olduğunu nasıl izleyebilirim?
Akış sorgusu ilerlemesinde bulunan avgOffsetsBehindLatest, maxOffsetsBehindLatest ve minOffsetsBehindLatest ölçümlerini kullanın. Abone olunan tüm konu bölümleri üzerinden akışınız, ne kadar sapmanın en son kullanılabilir sapmanın ardında olduğunu bildirir. Bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.
Ayrıca, henüz işlenmemiş toplam veri baytlarını tahmin etmek için de kullanabilirsiniz estimatedTotalBytesBehindLatest .
Databricks Runtime 17.1'e yükselttikten sonra Kafka uzaklık gecikmesi ölçümlerim neden kalıcı sıfır olmayan değerler gösteriyor?
Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.
Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .
Değerler büyükse veya sürekli büyüyorsa, akış gelen verilere ayak uydurmuyor olabilir. Bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.
Kafka akışı başlatmam neden yavaş?
Kafka akışları için şunlar için zaman gerekir:
- Kafka kümesine bağlanın ve meta verileri getirin.
- Konu bölümlerini keşfedin.
- İlk uzaklıkları getirme.
Şirket içi veya uzak Kafka kümeleri için ağ gecikme süresi başlatma süresini önemli ölçüde etkileyebilir. Tetiklenen/zamanlanmış işlem hatlarını sık sık yeniden başlatmalarla çalıştırıyorsanız, yinelenen başlatma ek yükünü önlemek için sürekli akış modunu kullanmayı göz önünde bulundurun.
Neden daha fazla Spark yürütücüsü eklemek Kafka aktarım hızımı artırmıyor?
Kafka aracıları doygunluğa ulaştığında, daha fazla Spark yürütücüsü eklemek, aktarım hızını artırmadan maliyeti artırır.
Kafka’nın darboğaz olduğunu gösteren işaretler:
- Daha fazla çekirdek eklemeye rağmen işlem hacmi sabitleşir.
- Kafka aracısı CPU veya ağ kullanımı yüksektir.
- Spark görevleri hızla tamamlar ancak yeni verileri bekler.
Bu sorunu çözmek için aracı ekleyerek veya yükü dağıtmak için bölüm sayısını artırarak Kafka kümenizi ölçeklendirin.
Kafka akışı için maliyet ve işlem kullanımını nasıl iyileştirebilirim?
Mikro toplu iş ve AvailableNow modları için:
- Kümenizi doğru boyutlandırın: Ölçümleri izleyin ve en yoğun yük için uygun bir sabit küme boyutu ayarlayın.
- Yükleme ani artışları sırasında kaynak kullanımını denetlemek için : Toplu iş boyutlarını sınırla seçeneğini kullanın
maxOffsetsPerTrigger. - Otomatik ölçeklendirmeden kaçının: Akış işleri sürekli çalışır ve düğümleri eklemek veya kaldırmak, görevin yeniden dengeleme ek yüküne neden olur.
-
Veri dengesizliği azaltma: Dengesiz bölümler, bazı görevlerin diğerlerinden önemli ölçüde daha fazla veri işlemesine neden olur ve bu da toplu işlemin tamamlanmasını yavaşlatan ve boşta kalan görevlerde işlem kaynaklarını boşa harcayan sapmalara neden olur.
minPartitionsDaha dengeli işleme için büyük Kafka bölümlerini daha küçük Spark bölümlerine bölme seçeneğini kullanın.
Gerçek zamanlı mod için doğru boyutlandırma özellikle önemlidir çünkü görevler verileri beklerken boşta kalabilir. Dikkat edilmesi gereken temel noktalar
-
maxPartitionsHer görevin, birden çok Kafka bölümünü işlemesini sağlayarak ek yükü azaltacak şekilde ayarlayın. -
spark.sql.shuffle.partitionsöğesini karıştırma yoğunluğu yüksek işler için ayarlayın.
Gerçek zamanlı mod için küme boyutlandırma yönergeleri için bkz. İşlem boyutlandırma .
Konu başlığında veriler mevcut olmasına rağmen akışım neden kayıt döndürmedi?
Yaygın nedenler şunlardır:
-
Yanlış
startingOffsetsayar: Varsayılan değer, yalnızca akış başladıktan sonra gelen yeni verileri okuyan değeridirlatest.startingOffsetsöğesini mevcut verileri okumak içinearliestolarak ayarlayın. - Yanlış konu adı: Doğru konuya abone olduğunuzu doğrulayın.
- Kimlik doğrulama sorunları: Akışınız başarıyla bağlanmış olabilir ancak konu başlığından okuma iznine sahip değil. Kafka ACL'lerinizi denetleyin.
-
Ofset süresi dolması: Akışınız uzun süre durdurulduysa ve kontrol noktasındaki ofsetlerin süresi dolmuşsa (Kafka saklama süresi tarafından silinmişse), kontrol noktasını sıfırlamanız veya ayarlamanız
failOnDataLossgerekebilir.