Aracılığıyla paylaş


İlk Yapılandırılmış Akış iş yükünüzü çalıştırma

Bu makalede, Azure Databricks'te ilk Yapılandırılmış Akış sorgularınızı çalıştırmak için gereken temel kavramların kod örnekleri ve açıklaması sağlanır. Neredeyse gerçek zamanlı ve artımlı işleme iş yükleri için Yapılandırılmış Akış'ı kullanabilirsiniz.

Yapılandırılmış Akış, Delta Live Tablolarında akış tablolarını destekleyen çeşitli teknolojilerden biridir. Databricks tüm yeni ETL, alım ve Yapılandırılmış Akış iş yükleri için Delta Live Tables kullanılmasını önerir. Bkz . Delta Live Tables nedir?.

Not

Delta Live Tables akış tablolarını bildirmek için biraz değiştirilmiş bir söz dizimi sağlarken, akış okumalarını ve dönüştürmelerini yapılandırmaya yönelik genel söz dizimi Azure Databricks'te tüm akış kullanım örnekleri için geçerlidir. Delta Live Tables ayrıca durum bilgilerini, meta verileri ve çok sayıda yapılandırmayı yöneterek akışı basitleştirir.

Nesne depolamadan akış verilerini okumak için Otomatik Yükleyici'yi kullanma

Aşağıdaki örnekte JSON verilerinin Otomatik Yükleyici ile yüklenmesi gösterilmektedir. Bu, biçim ve seçenekleri belirtmek için kullanılır cloudFiles . schemaLocation seçeneği şema çıkarımı ve evrimini etkinleştirir. Aşağıdaki kodu bir Databricks not defteri hücresine yapıştırın ve adlı raw_dfbir akış DataFrame oluşturmak için hücreyi çalıştırın:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Azure Databricks'te yapılan diğer okuma işlemleri gibi, bir akış okuma yapılandırması da veri yüklemez. Akış başlamadan önce verilerde bir eylem tetiklemeniz gerekir.

Not

Bir akış DataFrame'inde çağrılması display() bir akış işi başlatır. Çoğu Yapılandırılmış Akış kullanım örneğinde, akışı tetikleyen eylem havuza veri yazmak olmalıdır. Bkz . Yapılandırılmış Akış için üretimle ilgili dikkat edilmesi gerekenler.

Akış dönüştürmesi gerçekleştirme

Yapılandırılmış Akış, Azure Databricks ve Spark SQL'de kullanılabilen çoğu dönüştürmeyi destekler. Hatta MLflow modellerini UDF olarak yükleyebilir ve akış tahminlerini dönüştürme olarak yapabilirsiniz.

Aşağıdaki kod örneği, Spark SQL işlevlerini kullanarak alınan JSON verilerini ek bilgilerle zenginleştirmek için basit bir dönüştürme işlemini tamamlar:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Sonuçta transformed_df , veri kaynağına ulaşan her kaydı yüklemek ve dönüştürmek için sorgu yönergeleri yer alır.

Not

Yapılandırılmış Akış, veri kaynaklarını ilişkisiz veya sonsuz veri kümeleri olarak kabul eder. Bu nedenle, bazı dönüştürmeler sınırsız sayıda öğenin sıralanması gerekeceğinden Yapılandırılmış Akış iş yüklerinde desteklenmez.

Çoğu toplama ve birçok birleştirme, filigranlar, pencereler ve çıkış moduyla durum bilgilerinin yönetilmesini gerektirir. Bkz . Veri işleme eşiklerini denetlemek için filigranları uygulama.

Delta Lake'e artımlı toplu yazma gerçekleştirme

Aşağıdaki örnek, belirtilen dosya yolunu ve denetim noktasını kullanarak Delta Lake'e yazar.

Önemli

Yapılandırdığınız her akış yazıcısı için her zaman benzersiz bir denetim noktası konumu belirttiğinizden emin olun. Denetim noktası, akış sorgunuzla ilişkili işlenen tüm kayıtları ve durum bilgilerini izleyerek akışınız için benzersiz kimlik sağlar.

availableNow Tetikleyicinin ayarı Yapılandırılmış Akış'a kaynak veri kümesinden daha önce işlenmemiş tüm kayıtları işlemesini ve ardından kapatmasını bildirir; böylece bir akışı çalışır durumda bırakma konusunda endişelenmeden aşağıdaki kodu güvenle yürütebilirsiniz:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Bu örnekte, veri kaynağımıza yeni kayıt ulaşmadığından, bu kodun yinelenmiş yürütülmesi yeni kayıtları almaz.

Uyarı

Yapılandırılmış Akış yürütmesi, otomatik sonlandırmanın işlem kaynaklarını kapatmasını engelleyebilir. Beklenmeyen maliyetleri önlemek için akış sorgularını sonlandırdığından emin olun.

Delta Lake'ten veri okuma, dönüştürme ve Delta Lake'e yazma

Delta Lake hem kaynak hem de havuz olarak Yapılandırılmış Akış ile çalışmak için kapsamlı desteğe sahiptir. Bkz . Delta tablosu akış okuma ve yazma işlemleri.

Aşağıdaki örnekte, delta tablosundaki tüm yeni kayıtları artımlı olarak yüklemek, bunları başka bir Delta tablosunun anlık görüntüsüyle birleştirmek ve bir Delta tablosuna yazmak için örnek söz dizimi gösterilmektedir:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Kaynak tabloları okumak ve hedef tablolara ve belirtilen denetim noktası konumuna yazmak için yapılandırılmış uygun izinlere sahip olmanız gerekir. Veri kaynaklarınız ve havuzlarınız için ilgili değerleri kullanarak açılı ayraçlarla (<>) belirtilen tüm parametreleri doldurun.

Not

Delta Live Tables, Delta Lake işlem hatları oluşturmak için tam bildirim temelli bir söz dizimi sağlar ve tetikleyiciler ve denetim noktaları gibi özellikleri otomatik olarak yönetir. Bkz . Delta Live Tables nedir?.

Kafka'dan veri okuma, dönüştürme ve Kafka'ya yazma

Apache Kafka ve diğer mesajlaşma veri yolları, büyük veri kümeleri için en düşük gecikme sürelerinden bazılarını sağlar. Azure Databricks'i kullanarak Kafka'dan alınan verilere dönüştürmeler uygulayabilir ve sonra verileri Kafka'ya geri yazabilirsiniz.

Not

Bulut nesne depolama alanına veri yazmak ek gecikme yükü ekler. Delta Lake'te bir mesajlaşma veri yolu verilerini depolamak istiyorsanız ancak akış iş yükleri için mümkün olan en düşük gecikme süresini istiyorsanız Databricks, verileri lakehouse'a almak ve aşağı akış mesajlaşma veri yolu havuzları için neredeyse gerçek zamanlı dönüştürmeler uygulamak için ayrı akış işleri yapılandırmanızı önerir.

Aşağıdaki kod örneğinde, Verileri Delta tablosundaki verilerle birleştirip Kafka'ya geri yazarak Kafka'dan verileri zenginleştirmeye yönelik basit bir desen gösterilmektedir:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka hizmetinize erişim için yapılandırılmış uygun izinlere sahip olmanız gerekir. Veri kaynaklarınız ve havuzlarınız için ilgili değerleri kullanarak açılı ayraçlarla (<>) belirtilen tüm parametreleri doldurun. Bkz. Apache Kafka ve Azure Databricks ile akış işleme.