Spark yapılandırılmış akış ile akış verilerini lakehouse'a alma

Yapılandırılmış Akış, Spark üzerinde oluşturulmuş ölçeklenebilir ve hataya dayanıklı bir akış işleme altyapısıdır. Spark, veriler gelmeye devam ettikçe akış işlemini artımlı ve sürekli olarak çalıştırmayı üstlenir.

Yapılandırılmış akış Spark 2.2'de kullanılabilir hale geldi. O zamandan beri veri akışı için önerilen yaklaşımdır. Yapılandırılmış akışın ardındaki temel ilke, canlı veri akışını, tablodaki yeni bir satır gibi her zaman yeni verilerin eklendiği bir tablo olarak ele almaktır. CSV, JSON, ORC, Parquet gibi birkaç tanımlı yerleşik akış dosyası kaynağı ve Kafka ve Event Hubs gibi mesajlaşma hizmetleri için yerleşik destek vardır.

Bu makale, yüksek aktarım hızına sahip üretim ortamlarında Spark yapısı akışı aracılığıyla olayların işlenmesini ve alımını iyileştirmeye yönelik içgörüler sağlar. Önerilen yaklaşımlar şunlardır:

  • Veri akışı aktarım hızı iyileştirmesi
  • Delta tablosunda yazma işlemlerini iyileştirme ve
  • Olay toplu işlemi

Spark iş tanımları ve Spark not defterleri

Spark not defterleri, verilerinizden veya kodunuzdan içgörüler elde etmek için fikirleri doğrulamak ve denemeler yapmak için mükemmel bir araçtır. Not defterleri veri hazırlama, görselleştirme, makine öğrenmesi ve diğer büyük veri senaryolarında yaygın olarak kullanılır. Spark iş tanımları, spark kümesinde uzun süre çalışan etkileşimli olmayan kod odaklı görevlerdir. Spark iş tanımları sağlamlık ve kullanılabilirlik sağlar.

Spark not defterleri, kodunuzun mantığını test etmek ve tüm iş gereksinimlerini karşılamak için mükemmel bir kaynaktır. Ancak üretim senaryosunda çalışır durumda tutmak için En iyi çözüm, Yeniden Deneme İlkesi'nin etkinleştirildiği Spark iş tanımlarıdır.

Spark İş Tanımları için yeniden deneme ilkesi

Microsoft Fabric'te kullanıcı Spark İş Tanımı işleri için yeniden deneme ilkesi ayarlayabilir. İşteki betik sonsuz olsa da, betiği çalıştıran altyapı işi durdurmayı gerektiren bir soruna neden olabilir. Ya da temel altyapı düzeltme eki uygulama gereksinimleri nedeniyle iş ortadan kaldırılabilir. Yeniden deneme ilkesi, temel alınan sorunlar nedeniyle durursa kullanıcının işi otomatik olarak yeniden başlatmak için kurallar ayarlamasına olanak tanır. Parametreler, işin ne sıklıkta yeniden başlatılması gerektiğini, sonsuz yeniden denemelere kadar ve yeniden denemeler arasında zaman ayarlamayı belirtir. Bu şekilde kullanıcılar Spark İş Tanımı işlerinin, kullanıcı onları durdurmaya karar verene kadar sonsuz bir şekilde çalışmaya devam etmesini sağlayabilir.

Akış kaynakları

Event Hubs ile akışın ayarlanması, Event Hubs ad alanı adı, hub adı, paylaşılan erişim anahtarı adı ve tüketici grubunu içeren temel yapılandırmayı gerektirir. Tüketici grubu, olay hub'ının tamamının görünümüdür. Birden çok kullanan uygulamanın olay akışının ayrı bir görünümüne sahip olmasını ve akışı kendi hızlarında ve uzaklıklarıyla bağımsız olarak okumasını sağlar.

Bölümler, yüksek hacimli verileri işleyebilmenin önemli bir parçasıdır. Tek bir işlemci, saniye başına olayları işlemek için sınırlı kapasiteye sahipken, birden çok işlemci paralel olarak yürütürken daha iyi bir iş yapabilir. Bölümler, büyük hacimli olayları paralel olarak işleme olanağı sağlar.

Düşük alım oranıyla çok fazla bölüm kullanılıyorsa, bölüm okuyucular bu verilerin küçük bir bölümüyle ilgilenir ve bu da iyi amaçlı olmayan işlemeye neden olur. İdeal bölüm sayısı doğrudan istenen işleme oranına bağlıdır. Olay işlemenizi ölçeklendirmek istiyorsanız, daha fazla bölüm eklemeyi göz önünde bulundurun. Bir bölümde belirli bir aktarım hızı sınırı yoktur. Ancak, ad alanınızdaki toplam aktarım hızı, aktarım hızı birimi sayısıyla sınırlıdır. Ad alanınızdaki aktarım hızı birimi sayısını artırdıkça, eş zamanlı okuyucuların maksimum aktarım hızına ulaşmasına izin vermek için ek bölümler isteyebilirsiniz.

Öneri, aktarım hızı senaryonuz için en fazla sayıda bölümü araştırmak ve test etmektir. Ancak 32 veya daha fazla bölüm kullanarak yüksek aktarım hızına sahip senaryoları görmek yaygın bir durumdur.

Spark uygulamasını Azure Event Hubs'a bağlamak için Azure Event Hubs Bağlan or for Apache Spark (azure-event-hubs-spark) önerilir.

Akış havuzu olarak Lakehouse

Delta Lake, data lake storage çözümlerinin üzerinde ACID (bölünmezlik, tutarlılık, yalıtım ve dayanıklılık) işlemleri sağlayan bir açık kaynak depolama katmanıdır. Delta Lake ayrıca ölçeklenebilir meta veri işleme, şema evrimi, zaman yolculuğu (veri sürümü oluşturma), açık biçim ve diğer özellikleri destekler.

Doku Veri Madenciliği Delta Lake şunları yapmak için kullanılır:

  • Spark SQL kullanarak kolayca upsert (ekleme/güncelleştirme) ve silme.
  • Verileri sorgulamak için harcanan süreyi en aza indirmek için verileri sıkıştırın.
  • İşlemler yürütülmeden önceki ve sonraki tabloların durumunu görüntüleyin.
  • Tablolarda gerçekleştirilen işlemlerin geçmişini alın.

Delta, writeStream'de kullanılan olası çıkış havuz biçimlerinden biri olarak eklenir. Mevcut çıkış havuzları hakkında daha fazla bilgi için bkz . Spark Yapılandırılmış Akış Programlama Kılavuzu.

Aşağıdaki örnekte Delta Lake'e veri akışının nasıl mümkün olduğu gösterilmektedir.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Örnekte kırpılan kod hakkında:

  • format() , verilerin çıkış biçimini tanımlayan yönergedir.
  • outputMode() , akıştaki yeni satırların ne şekilde yazıldığını (ekleme, üzerine yazma) tanımlar.
  • toTable() akışı yapılan verileri parametre olarak geçirilen değer kullanılarak oluşturulan bir Delta tablosunda kalıcı hale getirmektedir.

Delta yazmalarını iyileştirme

Veri bölümleme, güçlü bir akış çözümü oluşturmanın kritik bir parçasıdır: bölümleme, verilerin düzenlanma şeklini ve ayrıca aktarım hızını artırır. Delta işlemlerinden sonra dosyalar kolayca parçalanır ve bu da çok fazla küçük dosyaya neden olur. Ayrıca çok büyük dosyalar da diske uzun süre yazıldığından bir sorundur. Veri bölümlemeyle ilgili zorluk, en iyi dosya boyutlarına neden olan uygun dengeyi bulmaktır. Spark, bellekte ve diskte bölümleme özelliğini destekler. Düzgün bölümlenmiş veriler, Delta Lake'e veri kalıcı hale getirmek ve Delta Lake'ten veri sorgulamak için en iyi performansı sağlayabilir.

  • Diskte verileri bölümlerken partitionBy() kullanarak verileri sütunlara göre bölümlemeyi seçebilirsiniz. partitionBy() , diske yazarken sağlanan bir veya birden çok sütunu temel alarak büyük semantik modeli daha küçük dosyalara bölmek için kullanılan bir işlevdir. Bölümleme, büyük bir anlam modeliyle çalışırken sorgu performansını geliştirmenin bir yoludur. Çok küçük veya çok büyük bölümler oluşturan bir sütun seçmekten kaçının. İyi bir kardinaliteye sahip bir sütun kümesini temel alan bir bölüm tanımlayın ve verileri en uygun boyuttaki dosyalara bölün.
  • Bellekteki verileri bölümleme işlemi repartition() veya coalesce() dönüştürmeleri kullanılarak yapılabilir, verileri birden çok çalışan düğümüne dağıtabilir ve Dayanıklı Dağıtılmış Veri Kümesi'nin (RDD) temellerini kullanarak verileri paralel olarak okuyabilen ve işleyebilen birden çok görev oluşturulabilir. Semantik modelin, kümenin farklı düğümlerinde hesaplanabilen mantıksal bölümlere bölünmesine olanak tanır.
    • repartition() bellekteki bölüm sayısını artırmak veya azaltmak için kullanılır. Yeniden bölümleme, tüm verileri ağ üzerinden yeniden oluşturur ve tüm bölümler arasında dengeler.
    • coalesce() yalnızca bölümlerin sayısını verimli bir şekilde azaltmak için kullanılır. Bu, verilerin tüm bölümler arasındaki hareketinin coalesce() kullanılarak daha düşük olduğu iyileştirilmiş bir repartition() sürümüdür.

Her iki bölümleme yaklaşımını da birleştirmek, yüksek aktarım hızına sahip senaryolarda iyi bir çözümdür. repartition() bellekte belirli sayıda bölüm oluştururken partitionBy() her bellek bölümü ve bölümleme sütunu için diske dosya yazar. Aşağıdaki örnekte, aynı Spark işinde her iki bölümleme stratejisinin de kullanımı gösterilmektedir: veriler önce bellekte 48 bölüme bölünür (toplam 48 CPU çekirdeğimiz olduğu varsayılarak) ve ardından yükteki mevcut iki sütuna göre diske bölümlenir.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

İyileştirilmiş Yazma

Delta Lake'e yazmaları iyileştirmeye yönelik bir diğer seçenek de İyileştirilmiş Yazma'nın kullanılmasıdır. İyileştirilmiş Yazma, verilerin Delta tablosuna yazılma biçimini geliştiren isteğe bağlı bir özelliktir. Spark, verileri yazmadan önce bölümleri birleştirir veya bölerek diske yazılan verilerin aktarım hızını en üst düzeye çıkarır. Ancak, tam karışıklığa neden olur, bu nedenle bazı iş yükleri için performans düşüşlerine neden olabilir. Disk üzerindeki verileri bölümlendirmek için coalesce() ve/veya repartition() kullanan işler, bunun yerine İyileştirilmiş Yazma kullanmaya başlamak için yeniden düzenleyebilir.

Aşağıdaki kod, İyileştirilmiş Yazma kullanımına bir örnektir. partitionBy() işlevinin hala kullanıldığını unutmayın.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Olayları toplu işleme

Delta lake'e veri alımı için harcanan süreyi geliştirmek üzere işlem sayısını en aza indirmek için olayları toplu hale getirmek pratik bir alternatiftir.

Tetikleyiciler, akış sorgusunun ne sıklıkta yürütülmesi (tetiklenmesi) gerektiğini tanımlar ve yeni bir veri yayar, bunları ayarlamak mikrobatlar için düzenli olarak bir işleme zaman aralığı tanımlar, verileri biriktirir ve olayları sürekli olarak diske yazmak yerine birkaç kalıcı işlem halinde toplu hale getirir.

Aşağıdaki örnekte olayların belirli aralıklarla bir dakikalık aralıklarla işlendiği bir akış sorgusu gösterilmektedir.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", " Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Delta tablosu yazma işlemlerinde olayları toplu olarak birleştirmenin avantajı, içinde daha fazla veri bulunan daha büyük Delta dosyaları oluşturması ve küçük dosyalardan kaçınmasıdır. Alınan veri miktarını analiz etmeli ve Delta kitaplığı tarafından oluşturulan Parquet dosyalarının boyutunu iyileştirmek için en iyi işleme süresini bulmalısınız.

İzleme

Spark 3.1 ve üzeri sürümler , aşağıdaki akış ölçümlerini içeren yerleşik yapılandırılmış akış kullanıcı arabirimine sahiptir:

  • Giriş Hızı
  • İşlem Hızı
  • Giriş Satırları
  • Toplu İş Süresi
  • İşlem Süresi