Spark yapılandırılmış akışı ile akış verilerini lakehouse'a alma
Yapılandırılmış Akış, Spark üzerinde oluşturulmuş ölçeklenebilir ve hataya dayanıklı bir akış işleme altyapısıdır. Spark, veriler gelmeye devam ettikçe akış işlemini artımlı ve sürekli olarak çalıştırmayı üstlenir.
Yapılandırılmış akış Spark 2.2'de kullanıma sunuldu. O zamandan beri veri akışı için önerilen yaklaşım olmuştur. Yapılandırılmış akışın ardındaki temel ilke, canlı veri akışını, tablodaki yeni bir satır gibi her zaman sürekli yeni verilerin eklendiği bir tablo olarak ele almaktır. CSV, JSON, ORC, Parquet gibi birkaç tanımlı yerleşik akış dosyası kaynağı ve Kafka ve Event Hubs gibi mesajlaşma hizmetleri için yerleşik destek vardır.
Önemli
Microsoft Fabric önizleme aşamasındadır.
Bu makale, yüksek aktarım hızına sahip üretim ortamlarında Spark yapısı akışı aracılığıyla olayların işlenmesini ve alımını iyileştirmeye yönelik içgörüler sağlar. Önerilen yaklaşımlar şunlardır:
- Veri akışı aktarım hızı iyileştirmesi
- Delta tablosunda yazma işlemlerini iyileştirme ve
- Olay toplu işlemi
Spark iş tanımları ve Spark not defterleri
Spark not defterleri, verilerinizden veya kodunuzdan içgörü elde etmek için fikirleri doğrulamak ve denemeler yapmak için mükemmel bir araçtır. Not defterleri veri hazırlama, görselleştirme, makine öğrenmesi ve diğer büyük veri senaryolarında yaygın olarak kullanılır. Spark iş tanımları, uzun süreler boyunca Spark kümesinde çalıştırılan etkileşimli olmayan kod odaklı görevlerdir. Spark iş tanımları sağlamlık ve kullanılabilirlik sağlar.
Spark not defterleri, kodunuzun mantığını test etmek ve tüm iş gereksinimlerini karşılamak için mükemmel bir kaynaktır. Ancak bir üretim senaryosunda çalışır durumda tutmak için En iyi çözüm, Yeniden Deneme İlkesi'nin etkinleştirildiği Spark iş tanımlarıdır.
Spark İş Tanımları için yeniden deneme ilkesi
Microsoft Fabric'te kullanıcı Spark İş Tanımı işleri için yeniden deneme ilkesi ayarlayabilir. İşteki betik sonsuz olsa da, betiği çalıştıran altyapı işi durdurmayı gerektiren bir soruna neden olabilir. Ya da temel altyapı düzeltme eki uygulama gereksinimleri nedeniyle iş ortadan kaldırılabilir. Yeniden deneme ilkesi, temel alınan sorunlar nedeniyle durdurulursa kullanıcının işi otomatik olarak yeniden başlatmak için kurallar ayarlamasına olanak tanır. Parametreler, işin ne sıklıkta yeniden başlatılması gerektiğini, sonsuz yeniden denemelere kadar ve yeniden denemeler arasındaki süreyi ayarlamayı belirtir. Bu şekilde kullanıcılar Spark İş Tanımı işlerinin, kullanıcı onları durdurmaya karar verene kadar sonsuz olarak çalışmaya devam etmesini sağlayabilir.
Akış kaynakları
Event Hubs ile akışın ayarlanması, Event Hubs ad alanı adını, hub adını, paylaşılan erişim anahtarı adını ve tüketici grubunu içeren temel yapılandırmayı gerektirir. Tüketici grubu, olay hub'ının tamamının görünümüdür. Birden çok kullanan uygulamanın olay akışının ayrı bir görünümüne sahip olmasını ve akışı kendi hızlarında ve uzaklıklarıyla bağımsız olarak okumasını sağlar.
Bölümler, yüksek hacimli verileri işleyebilmenin önemli bir parçasıdır. Tek bir işlemci, saniyedeki olayları işlemek için sınırlı kapasiteye sahipken, birden çok işlemci paralel olarak yürütürken daha iyi bir iş yapabilir. Bölümler, büyük hacimli olayları paralel olarak işleme olanağı sağlar.
Düşük alım oranıyla çok fazla bölüm kullanılıyorsa, bölüm okuyucular bu verilerin çok küçük bir kısmıyla ilgilenir ve bu da yetersiz işlemeye neden olur. İdeal bölüm sayısı doğrudan istenen işleme hızına bağlıdır. Olay işlemenizi ölçeklendirmek istiyorsanız daha fazla bölüm eklemeyi göz önünde bulundurun. Bir bölümde belirli bir aktarım hızı sınırı yoktur. Ancak, ad alanınızdaki toplam aktarım hızı, aktarım hızı birimi sayısıyla sınırlıdır. Ad alanınızdaki aktarım hızı birimi sayısını artırdıkça, eş zamanlı okuyucuların en yüksek aktarım hızına ulaşmasına izin vermek için fazladan bölümler isteyebilirsiniz.
Aktarım hızı senaryonuz için en fazla sayıda bölümü araştırmanız ve test etmeniz tavsiye edilir. Ancak 32 veya daha fazla bölüm kullanarak yüksek aktarım hızına sahip senaryolar görmek yaygın bir durumdur.
Spark uygulamasının Azure Event Hubs'a bağlanması için Apache Spark için Azure Event Hubs Bağlayıcısı (bağlantı) önerilir.
Akış havuzu olarak Lakehouse
Delta Lake, data lake storage çözümlerinin üzerinde ACID (bölünmezlik, tutarlılık, yalıtım ve dayanıklılık) işlemleri sağlayan bir açık kaynak depolama katmanıdır. Delta Lake ölçeklenebilir meta veri işleme, şema evrimi, zaman yolculuğu (veri sürümü oluşturma), açık biçim ve diğer özellikleri de destekler.
Doku Veri Madenciliği Delta Lake şunları yapmak için kullanılır:
- Spark SQL kullanarak verileri kolayca upsert (ekleme/güncelleştirme) ve silme.
- Verileri sorgulamak için harcanan zamanı en aza indirmek için verileri sıkıştırın.
- İşlemler yürütülmeden önceki ve sonraki tabloların durumunu görüntüleyin.
- Tablolarda gerçekleştirilen işlemlerin geçmişini alın.
Delta, writeStream'de kullanılan olası çıkış havuzları biçimlerinden biri olarak eklenir. Mevcut çıkış havuzları hakkında daha fazla bilgiyi burada bulabilirsiniz.
Aşağıdaki örnekte Delta Lake'e veri akışının nasıl mümkün olduğu gösterilmektedir.
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Schema = StructType([StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True)])
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.toTable("deltaeventstable")
Örnekte alıntı yapılan kod hakkında:
- format() , verilerin çıkış biçimini tanımlayan yönergedir.
- outputMode() akıştaki yeni satırların hangi şekilde yazıldığını tanımlar (yani ekleme, üzerine yazma).
- toTable() akışı yapılan verileri parametre olarak geçirilen değer kullanılarak oluşturulan bir Delta tablosunda kalıcı hale getirmektedir.
Delta yazmalarını iyileştirme
Veri bölümleme, güçlü bir akış çözümü oluşturmanın kritik bir parçasıdır: bölümleme, verilerin düzenlanma şeklini ve aktarım hızını da geliştirir. Delta işlemlerinden sonra dosyalar kolayca parçalanır ve bu da çok fazla küçük dosyaya neden olur. Ayrıca diske yazmanın uzun sürmesi nedeniyle çok büyük dosyalar da sorun oluşturur. Veri bölümlemeyle ilgili zorluk, en iyi dosya boyutlarına neden olan uygun dengeyi bulmaktır. Spark bellekte ve diskte bölümlendirmeyi destekler. Düzgün bölümlenmiş veriler, Delta Lake'e veri kalıcı hale getirmek ve Delta Lake'ten veri sorgulamak için en iyi performansı sağlayabilir.
- Disk üzerindeki verileri bölümlerken partitionBy() kullanarak verileri sütunlara göre bölümlemeyi seçebilirsiniz. partitionBy() , diske yazarken sağlanan bir veya birden çok sütuna göre büyük veri kümesini daha küçük dosyalara bölmek için kullanılan bir işlevdir. Bölümleme, büyük bir veri kümesiyle çalışırken sorgu performansını geliştirmenin bir yoludur. Çok küçük veya çok büyük bölümler oluşturan bir sütun seçmekten kaçının. İyi bir kardinaliteye sahip bir dizi sütunu temel alan bir bölüm tanımlayın ve verileri en uygun boyuttaki dosyalara bölün.
- Bellekteki verileri bölümleme işlemi repartition() veya coalesce() dönüştürmeleri kullanılarak yapılabilir, verileri birden çok çalışan düğümüne dağıtabilir ve Dayanıklı Dağıtılmış Veri Kümesi'nin (RDD) temellerini kullanarak verileri paralel olarak okuyup işleyebilen birden çok görev oluşturulabilir. Veri kümesinin, kümenin farklı düğümlerinde hesaplanabilen mantıksal bölümlere bölünmesine olanak tanır.
- repartition() bellekteki bölüm sayısını artırmak veya azaltmak için kullanılır. Yeniden bölümleme, tüm verileri ağ üzerinden yeniden oluşturur ve tüm bölümler arasında dengeler.
- coalesce() yalnızca bölüm sayısını verimli bir şekilde azaltmak için kullanılır. Bu, tüm bölümler arasında veri taşıma işleminin coalesce() kullanılarak daha düşük olduğu en iyi duruma getirilmiş repartition() sürümüdür.
Her iki bölümleme yaklaşımını birleştirmek, yüksek aktarım hızına sahip senaryolarda iyi bir çözümdür. repartition() bellekte belirli sayıda bölüm oluştururken partitionBy() her bellek bölümü ve bölümleme sütunu için diske dosya yazar. Aşağıdaki örnek, aynı Spark işinde her iki bölümleme stratejisinin kullanımını göstermektedir: veriler önce bellekteki 48 bölüme bölünür (toplam 48 CPU çekirdeğimiz olduğu varsayılarak) ve ardından yükteki mevcut iki sütuna göre diske bölümlenir.
import pyspark.sql.functions as f
from pyspark.sql.types import *
import json
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
İyileştirilmiş Yazma
Delta Lake'e yazma işlemini iyileştirmeye yönelik bir diğer seçenek de İyileştirilmiş Yazma kullanmaktır. İyileştirilmiş Yazma, verilerin Delta tablosuna yazılma biçimini geliştiren isteğe bağlı bir özelliktir. Spark, verileri yazmadan önce bölümleri birleştirir veya bölerek diske yazılan verilerin aktarım hızını en üst düzeye çıkarır. Ancak, tam karıştırmaya neden olur, bu nedenle bazı iş yükleri için performans düşüşlerine neden olabilir. Disk üzerindeki verileri bölümlendirmek için coalesce() ve/veya repartition() kullanan işler, Bunun yerine İyileştirilmiş Yazma kullanmaya başlamak için yeniden düzenleyebilir.
Aşağıdaki kod, İyileştirilmiş Yazma kullanımına bir örnektir. partitionBy() işlevinin hala kullanıldığını unutmayın.
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true)
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.toTable("deltaeventstable")
Olayları toplu işleme
Delta lake'e veri alımı için harcanan zamanı iyileştirmek üzere işlem sayısını en aza indirmek için olayları toplu hale getirmek pratik bir alternatiftir.
Tetikleyiciler, akış sorgusunun ne sıklıkta yürütülmesi (tetiklenmesi) gerektiğini tanımlar ve yeni bir veri yayar, bunları ayarlamak mikrobatlar için düzenli bir işleme zaman aralığı tanımlar, verileri biriktirir ve olayları sürekli diske yazmak yerine birkaç kalıcı işlem halinde işler.
Aşağıdaki örnekte olayların bir dakikalık aralıklarla düzenli aralıklarla işlendiği bir akış sorgusu gösterilmektedir.
rawData = df \
.withColumn("bodyAsString", f.col("body").cast("string")) \
.select(from_json("bodyAsString", Schema).alias("events")) \
.select("events.*") \
.repartition(48) \
.writeStream \
.format("delta") \
.option("checkpointLocation", " Files/checkpoint") \
.outputMode("append") \
.partitionBy("<column_name_01>", "<column_name_02>") \
.trigger(processingTime="1 minute") \
.toTable("deltaeventstable")
Delta tablosu yazma işlemlerinde olayların toplu olarak birleştirilmesinin avantajı, küçük dosyalardan kaçınarak daha fazla veri içeren daha büyük Delta dosyaları oluşturmasıdır. Alınan veri miktarını analiz etmeli ve Delta kitaplığı tarafından oluşturulan Parquet dosyalarının boyutunu iyileştirmek için en iyi işleme süresini bulmalısınız.
İzleme
Spark 3.1 ve üzeri sürümler, aşağıdaki akış ölçümlerini içeren yerleşik yapılandırılmış akış kullanıcı arabirimine (bağlantı) sahiptir:
- Giriş Hızı
- İşlem Hızı
- Giriş Satırları
- Toplu İş Süresi
- İşlem Süresi
Sonraki adımlar
- Akış verilerini lakehouse'a alın ve SQL uç noktasıyla erişim sağlayın.