Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu makalede, Azure Databricks üzerinde Yapılandırılmış Akış iş yüklerini çalıştırırken Apache Kafka'nın kaynak veya havuz olarak nasıl kullanılabileceği açıklanmaktadır.
Kafka hakkında daha fazla bilgi için Apache Kafka belgelerine bakın.
Kafka'dan veri okuma
Azure Databricks, Kafka bağlantılarını yapılandırmak için veri biçimi olarak kafka anahtar sözcüğünü sağlar. Aşağıda bir akış okuma örneği verilmiştir:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
SQL
CREATE OR REFRESH STREAMING TABLE <table_name> AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>'
);
Azure Databricks, aşağıdaki örnekte gösterildiği gibi toplu okuma semantiğini de destekler:
Python
df = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
Scala
val df = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<server:ip>',
subscribe => '<topic>',
startingOffsets => 'earliest',
endingOffsets => 'latest'
);
Artımlı toplu yükleme için Databricks, Trigger.AvailableNow ile Kafka'nın kullanılmasını önerir. Bkz AvailableNow. Artımlı toplu işlem.
Databricks Runtime 13.3 LTS ve üzerinde, Azure Databricks Kafka verilerini okumak için bir SQL işlevi de sağlar. SQL ile akış yalnızca Lakeflow Spark Bildirimli İşlem Hatlarında veya Databricks SQL'deki akış tablolarında desteklenir. Bakınız read_kafka tablo değerli fonksiyon.
Kafka Yapılandırılmış Akış okuyucusu yapılandırma
Hem toplu hem de akış sorguları için Kafka kaynağı için aşağıdaki seçenek ayarlanmalıdır:
| Seçenek | Değer | Açıklama |
|---|---|---|
kafka.bootstrap.servers |
Virgülle ayrılmış host:port listesi | Kafka kümesi önyükleme sunucuları |
Ayrıca, abone olunacak konuları belirtmek için aşağıdaki seçeneklerden biri gereklidir:
| Seçenek | Değer | Açıklama |
|---|---|---|
subscribe |
Virgülle ayrılmış konu listesi. | Abone olunacak konu listesi. |
subscribePattern |
Java regex dizesi. | Konulara abone olmak için kullanılan desen. |
assign |
JSON dizesi {"topicA":[0,1],"topic":[2,4]}. |
Tüketime özel topicPartitions. |
Kullanılabilir seçeneklerin tam listesi için Seçenekler sayfasına bakın.
Kafka kayıtları şeması
Kafka Yapılandırılmış Akış okuyucusu tarafından döndürülen kayıtlar aşağıdaki şemaya sahip olacaktır:
| Köşe yazısı | Türü |
|---|---|
key |
binary |
value |
binary |
topic |
string |
partition |
int |
offset |
long |
timestamp |
long |
timestampType |
int |
key ve value, her zaman ByteArrayDeserializer ile bayt dizisi olarak seri durumdan çıkarılır. Anahtarları ve değerleri açıkça seri durumdan çıkarmak için DataFrame işlemlerini (veya cast("string")gibifrom_avro) kullanın.
Kafka'ya veri yazma
Aşağıda Kafka'ya akış yazma örneği verilmiştir:
Python
(df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Scala
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
Azure Databricks, aşağıdaki örnekte gösterildiği gibi Kafka veri havuzlarına toplu yazma semantiğini de destekler:
Python
(df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Scala
df.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
Kafka Yapılandırılmış Akış yazıcısını yapılandırma
Önemli
Databricks Runtime 13.3 LTS ve üstü, kafka-clients kütüphanesinin varsayılan olarak idempotent yazmaları etkinleştiren daha yeni bir sürümünü içerir. Kafka havuzu, yapılandırılmış ACL'lerle birlikte 2.8.0 veya daha düşük bir sürüm kullanıyorsa ancak IDEMPOTENT_WRITE etkinleştirilmediyse, yazma işlemi org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state hata iletisiyle başarısız olur.
Kafka sürümünü 2.8.0 veya daha üstüne yükselterek veya Yapılandırılmış Akış yazıcınızı yapılandırırken .option(“kafka.enable.idempotence”, “false”) ayarı yaparak bu hatayı düzeltin.
Kafka'ya yazarken ayarlanan yaygın seçenekler şunlardır:
| Seçenek | Değer | Varsayılan değer | Açıklama |
|---|---|---|---|
kafka.boostrap.servers |
Virgülle ayrılmış <host:port> listesi |
yok | [Gerekli] Kafka bootstrap.servers yapılandırması. |
topic |
STRING |
ayarlanmadı | [İsteğe bağlı] Tüm satırların yazılacağı konuyu ayarlar. Bu seçenek, verilerde bulunan tüm konu sütunlarını geçersiz kılar. |
includeHeaders |
BOOLEAN |
false |
[İsteğe bağlı] Kafka üst bilgilerinin satıra eklenip eklenmeyeceği. |
Kullanılabilir seçeneklerin tam listesi için Seçenekler sayfasına bakın.
Kafka yazıcı şeması
Kafka'ya veri yazarken, sağlanan DataFrame aşağıdaki alanları içerebilir:
| Sütun adı | Gerekli veya isteğe bağlı | Türü |
|---|---|---|
key |
optional |
STRING veya BINARY |
value |
required |
STRING veya BINARY |
headers |
optional | ARRAY |
topic |
isteğe bağlı (eğer topic yazar seçeneği olarak ayarlanmışsa yoksayılır) |
STRING |
partition |
optional | INT |
Kimlik doğrulama
Azure Databricks; Unity Catalog hizmeti kimlik bilgileri, SASL/SSL ve AWS MSK, Azure Event Hubs ve Google Cloud Managed Kafka için buluta özgü seçenekler de dahil olmak üzere Kafka için birden çok kimlik doğrulama yöntemini destekler. Bkz. Kimlik doğrulaması.
Kafka ölçümlerini alma
, avgOffsetsBehindLatestve maxOffsetsBehindLatest ölçümlerini kullanarak bir akış sorgusunun Kafka'nın minOffsetsBehindLatestarkasında ne kadar gecikmeli olduğunu izleyebilirsiniz. Bunlar, Kafka'daki en son uzaklıklara göre abone olunan tüm konu bölümleri arasında ortalama, en yüksek ve en düşük uzaklık gecikmesini bildirir. Bkz Etkileşimli Ölçümleri Okuma.
Uyarı
Databricks Runtime 17.1 ve üzeri sürümlerde, her mikro toplu işlem tamamlandıktan sonra en son Kafka uzaklıkları getirilir. Sürekli veri alan konularda birikim ölçümleri küçük, kalıcı sıfırdan farklı değerler gösterebilir. Bu beklenen bir davranıştır ve akışın geride kaldığını göstermez.
Databricks Runtime 17.0 ve altında en son Kafka ofsetleri mikro toplu işin başlangıç zamanında getirilir. Akış sorguları mikro toplu iş başlangıcında kullanılabilen tüm kayıtları tutarlı bir şekilde tükettiğinde kapsam ölçümleri döndürülebilir 0 .
Sorgunun henüz ne kadar veri kullanmadığını tahmin etmek için ölçümü kullanın estimatedTotalBytesBehindLatest . Bu ölçüm, son 300 saniye içinde işlenen toplu işlemlere göre abone olunan tüm bölümlerde kalan toplam bayt sayısını tahmin eder. Seçeneğini ayarlayarak bytesEstimateWindowLength bu tahmin için kullanılan zaman penceresini değiştirebilirsiniz. Örneğin, 10 dakikaya ayarlamak için:
Python
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
Scala
val df = spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds
Akışı bir not defterinde çalıştırıyorsanız, akış sorgusu ilerleme durumu panosundaki Ham Veri sekmesinin altında şu ölçümleri görebilirsiniz:
{
"sources": [
{
"description": "KafkaV2[Subscribe[topic]]",
"metrics": {
"avgOffsetsBehindLatest": "4.0",
"maxOffsetsBehindLatest": "4",
"minOffsetsBehindLatest": "4",
"estimatedTotalBytesBehindLatest": "80.0"
}
}
]
}
Daha fazla bilgi için bkz. Azure Databricks üzerinde Yapılandırılmış Akış sorgularını izleme.
Kod örneği: Kafka'dan Delta'ya
Aşağıdaki örnekte, Kafka'dan Delta tablosuna sürekli veri akışı için eksiksiz bir iş akışı gösterilmektedir. Bu düzen neredeyse gerçek zamanlı veri alımı iş yükleri için idealdir.
Bu örnekte sabit bir JSON şeması kullanılır. Avro veya Protobuf gibi diğer biçimler için from_avro veya from_protobuf kullanın. Ayrıca bir şema kayıt defteriyle tümleştirebilirsiniz. Bkz. Schema Registry ile örnek.
Python
from pyspark.sql.functions import from_json, col
# Define simple JSON schemas for key and value
key_schema = "user_id STRING"
value_schema = "event_type STRING, event_ts TIMESTAMP"
# Configure Kafka options with service credentials
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9092",
"subscribe": "<topic-name>",
"databricks.serviceCredential": "<service-credential-name>",
}
# Read from Kafka and parse JSON
parsed_df = (spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
.select(
from_json(col("key").cast("string"), key_schema).alias("key"),
from_json(col("value").cast("string"), value_schema).alias("value")
)
.select("key.*", "value.*")
)
# Write to Delta table
query = (parsed_df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(processingTime="10 seconds")
.toTable("catalog.schema.events_table")
)
query.awaitTermination()
Scala
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.streaming.Trigger
// Define JSON schemas for key and value
val keySchema = "user_id STRING"
val valueSchema = "event_type STRING, event_ts TIMESTAMP"
// Configure Kafka options with service credentials
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9092",
"subscribe" -> "<topic-name>",
"databricks.serviceCredential" -> "<service-credential-name>"
)
// Read from Kafka and parse JSON
val parsedDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.select(
from_json(col("key").cast("string"), keySchema).alias("key"),
from_json(col("value").cast("string"), valueSchema).alias("value")
)
.select("key.*", "value.*")
// Write to Delta table
val query = parsedDF.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(Trigger.ProcessingTime("10 seconds"))
.toTable("catalog.schema.events_table")
query.awaitTermination()
SQL
-- Create a streaming table from Kafka using read_kafka
CREATE OR REFRESH STREAMING TABLE catalog.schema.events_table AS
SELECT
key::string:user_id AS user_id,
value::string:event_type AS event_type,
to_timestamp(value::string:event_ts) AS event_ts
FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9092',
subscribe => '<topic-name>',
serviceCredential => '<service-credential-name>'
);