Aracılığıyla paylaş


İşlem hatları ile verileri dönüştürme

Bu makalede, veri kümelerinde dönüşümleri bildirmek ve kayıtların sorgu mantığı aracılığıyla nasıl işleneceğini belirtmek için işlem hatlarını nasıl kullanabileceğiniz açıklanmaktadır. Ayrıca, işlem hatları oluşturmaya yönelik yaygın dönüştürme desenlerine örnekler içerir.

DataFrame döndüren herhangi bir sorguda bir veri kümesi tanımlayabilirsiniz. Lakeflow Spark Bildirimli İşlem Hatlarında dönüştürmeler olarak Apache Spark yerleşik işlemlerini, UDF'leri, özel mantığı ve MLflow modellerini kullanabilirsiniz. Veriler işlem hattınıza alındıktan sonra, yukarı akış kaynaklarına göre yeni veri kümeleri tanımlayarak yeni akış tabloları, gerçekleştirilmiş görünümler ve standart görünümler oluşturabilirsiniz.

İşlem hattında durum bilgisi olan işlemeyi etkili bir şekilde gerçekleştirmeyi öğrenmek için bkz. Filigranlarla durum bilgisi işlemeyi iyileştirme.

Görünümler, gerçekleştirilmiş görünümler ve akış tabloları ne zaman kullanılır?

İşlem hattı sorgularınızı uygularken verimli ve sürdürülebilir olduklarından emin olmak için en iyi veri kümesi türünü seçin.

Aşağıdakileri yapmak için bir görünüm kullanmayı düşünün:

  • İstediğiniz büyük veya karmaşık bir sorguyu daha kolay yönetilebilir sorgulara bölün.
  • Beklentileri kullanarak ara sonuçları doğrulayın.
  • Kalıcı olması gerekmeyen sonuçlar için depolama ve işlem maliyetlerini azaltın. Tablolar materyalize edildiğinden, ek hesaplama ve depolama kaynakları gerektirir.

Aşağıdaki durumlarda materyalize edilmiş bir görünüm kullanmayı göz önünde bulundurun.

  • Birden çok alt sorgu tabloyu tüketir. Görünümler isteğe bağlı olarak hesaplandığından, görünüm her sorgulandığında yeniden hesaplanır.
  • Diğer ardışık düzenler, işler veya sorgular tabloyu tüketir. Görünümler oluşturulmadığı için, bunları yalnızca aynı veri hattında kullanabilirsiniz.
  • Geliştirme sırasında sorgunun sonuçlarını görüntülemek istiyorsunuz. Tablolar gerçekleştirilmiş olduğundan ve işlem hattı dışında görüntülenebildiği ve sorgulanabildiği için, geliştirme sırasında tabloların kullanılması hesaplamaların doğruluğunu doğrulamaya yardımcı olabilir. Doğruladıktan sonra, gerçekleştirme gerektirmeyen sorguları görünümlere dönüştürün.

Aşağıdaki durumlarda akış tablosu kullanmayı göz önünde bulundurun:

  • Sürekli veya artımlı olarak büyüyen bir veri kaynağında sorgu tanımlanır.
  • Sorgu sonuçları artımlı olarak hesaplanmalıdır.
  • İşlem hattı için yüksek aktarım hızı ve düşük gecikme süresi gerekir.

Uyarı

Akış tabloları her zaman akış kaynaklarına göre tanımlanır. Güncellemeleri CDC akışlarından uygulamak için AUTO CDC ... INTO ile yayın kaynaklarını da kullanabilirsiniz. Bkz AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirin.

Tabloları hedef şemanın dışında tutma

Dış tüketime yönelik olmayan ara tabloları hesaplamanız gerekiyorsa, TEMPORARY anahtar sözcüğünü kullanarak bunların bir şemada yayımlanmasını engelleyebilirsiniz. Geçici tablolar, Lakeflow Spark Bildirimli İşlem Hatları semantiğine göre verileri depolamaya ve işlemeye devam eder ancak geçerli işlem hattı dışından erişilmemelidir. Geçici bir tablo, onu oluşturan iş hattının ömrü boyunca kalır. Geçici tabloları bildirmek için aşağıdaki söz dizimini kullanın:

SQL

CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;

Piton

@dp.table(
  temporary=True)
def temp_table():
  return ("...")

Akış tablolarını ve gerçekleştirilmiş görünümleri tek bir işlem hattında birleştirme

Akış tabloları, Apache Spark Yapılandırılmış Akış'ın işleme garantilerini devralır ve yeni satırların her zaman değiştirilmek yerine kaynak tabloya eklendiği, yalnızca ekleme veri kaynaklarından gelen sorguları işlemek üzere yapılandırılır.

Uyarı

Varsayılan olarak, akış tabloları yalnızca eklemeli veri kaynakları gerektirir, ancak bir akış kaynağı, güncellemeler veya silmeler gerektiren başka bir akış tablosu olduğunda, skipChangeCommits bayrağı ile bu davranışı geçersiz kılabilirsiniz.

Yaygın bir akış düzeni, bir işlem hattında ilk veri kümelerini oluşturmak için kaynak verilerin alımını içerir. Bu ilk veri kümeleri genellikle bronz tablolar olarak adlandırılır ve genellikle basit dönüştürmeler gerçekleştirir.

Buna karşılık, işlem hattındaki son tablolar genellikle altın tablolar (gold tables) olarak adlandırılır ve karmaşık toplamalar veya AUTO CDC ... INTO işleminin hedeflerinden okuma gerektirir. Bu işlemler doğal olarak eklemeler yerine güncelleştirmeler oluşturduğundan, akış tablolarına giriş olarak desteklenmez. Bu dönüşümler materyalize edilmiş görünümler için daha uygundur.

Akış tablolarını ve gerçekleştirilmiş görünümleri tek bir işlem hattında birleştirerek işlem hattınızı basitleştirebilir, ham verilerin yüksek maliyetli bir şekilde yeniden alımını veya yeniden işlenmesini önleyebilir ve verimli bir şekilde kodlanmış ve filtrelenmiş bir veri kümesi üzerinde karmaşık toplamaları hesaplamak için SQL'in tüm gücüne sahip olabilirsiniz. Aşağıdaki örnekte bu tür bir karma işleme gösterilmektedir:

Uyarı

Bu örneklerde, bulut depolamadan dosya yüklemek için Otomatik Yükleyici kullanılır. Unity Kataloğu etkin bir işlem hattında Otomatik Yükleyici ile dosya yüklemek için dış konumları kullanmanız gerekir. Unity Kataloğu'nu işlem hatlarıyla kullanma hakkında daha fazla bilgi edinmek için bkz. Unity Kataloğu'nu işlem hatlarıyla kullanma.

Piton

@dp.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dp.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return spark.readStream.table("streaming_bronze").where(...)

@dp.materialized_view
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return spark.read.table("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
  "abfss://path/to/raw/data",
  format => "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...

CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id

JSON dosyalarını Azure depolama alanından artımlı olarak almak için Otomatik Yükleyici kullanma hakkında daha fazla bilgi edinin.

Akış-statik birleşimler

Akış-statik birleşimler, öncelikle statik bir boyut tablosu içeren yalnızca eklenebilir verilerin sürekli akışını denormalize ettiğinizde iyi bir seçimdir.

Her işlem hattı güncelleştirmesi ile akıştan yeni kayıtlar statik tablonun en güncel anlık görüntüsüyle birleştirilir. Akış tablosundan karşılık gelen veriler işlendikten sonra statik tabloya kayıtlar eklenir veya güncelleştirilirse, tam yenileme yapılmadığı sürece sonuç kayıtları yeniden hesaplanmaz.

Tetiklenen yürütme için yapılandırılmış işlem hatlarında, statik tablo güncelleştirmenin başlatıldığı anda sonuçları döndürür. Sürekli yürütme için yapılandırılmış işlem hatlarında, tablo bir güncelleştirmeyi her işlediğinde statik tablonun en son sürümü sorgulanır.

Aşağıda bir akış-statık birleştirme örneği verilmiştir.

Piton

@dp.table
def customer_sales():
  return spark.readStream.table("sales").join(spark.read.table("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
  INNER JOIN LEFT customers USING (customer_id)

Toplamaları verimli bir şekilde hesaplama

Akış tablolarını kullanarak count, min, max veya sum gibi basit dağıtım toplamalarını ve ortalama veya standart sapma gibi cebirsel toplamaları artımlı olarak hesaplayabilirsiniz. Databricks, GROUP BY country yan tümcesine sahip bir sorgu gibi sınırlı sayıda gruba sahip sorgular için artımlı toplama önerir. Her güncelleştirmede yalnızca yeni giriş verileri okunur.

Artımlı toplamalar gerçekleştiren Lakeflow Spark Bildirimli İşlem Hatları sorguları yazma hakkında daha fazla bilgi edinmek için bkz. Filigranlarla pencereli toplamalar gerçekleştirme.

Lakeflow Spark Bildirimli İşlem Hatlarında MLflow modellerini kullanma

Uyarı

Unity Kataloğu özellikli bir işlem hattında MLflow modellerini kullanmak için işlem hattınızın preview kanalını kullanacak şekilde yapılandırılması gerekir. current kanalını kullanmak için işlem hattınızı Hive meta veri deposunda yayımlayacak şekilde yapılandırmanız gerekir.

İşlem hatlarında MLflow tarafından eğitilen modelleri kullanabilirsiniz. MLflow modelleri, Azure Databricks'te dönüşüm olarak değerlendirilir; bu da spark DataFrame girişi üzerine hareket ettikleri ve sonuçları Spark DataFrame olarak döndürdikleri anlamına gelir. Lakeflow Spark Bildirimli İşlem Hatları DataFrame'lerde veri kümelerini tanımladığından, MLflow kullanan Apache Spark iş yüklerini yalnızca birkaç kod satırıyla işlem hatlarına dönüştürebilirsiniz. MLflow hakkında daha fazla bilgi için bkz. ML modeli yaşam döngüsü için MLflow.

MLflow modelini çağıran bir Python betiğiniz varsa, bu kodu @dp.table veya @dp.materialized_view dekoratörünü kullanarak ve işlevlerin dönüşüm sonuçlarını döndürecek şekilde tanımlandığından emin olarak bir işlem hattına uyarlayabilirsiniz. Lakeflow Spark Bildirimli İşlem Hatları, varsayılan olarak MLflow'u yüklemez, bu nedenle %pip install mlflow ile MLflow kütüphanelerini yüklediğinizden ve kaynağınızın en üstünde mlflow ve dp içeri aktardığınızdan emin olun. İşlem hattı söz dizimine giriş için bkz. Python ile işlem hattı kodu geliştirme.

İşlem hatlarında MLflow modellerini kullanmak için aşağıdaki adımları tamamlayın:

  1. MLflow modelinin çalıştırma kimliğini ve model adını alın. Çalıştırma kimliği ve model adı, MLflow modelinin URI'sini oluşturmak için kullanılır.
  2. MLflow modelini yüklemek üzere Spark UDF tanımlamak için URI'yi kullanın.
  3. MLflow modelini kullanmak için tablo tanımlarınızdaki UDF'yi çağırın.

Aşağıdaki örnekte bu desen için temel söz dizimi gösterilmektedir:

%pip install mlflow

from pyspark import pipelines as dp
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dp.materialized_view
def model_predictions():
  return spark.read.table(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Tam bir örnek olarak aşağıdaki kod, kredi riski verileri üzerinde eğitilmiş bir MLflow modelini yükleyen loaded_model_udf adlı bir Spark UDF tanımlar. Tahmin yapmak için kullanılan veri sütunları, bağımsız değişken olarak UDF'ye iletilir. tablo loan_risk_predictionsloan_risk_input_dataiçindeki her satır için tahminleri hesaplar.

%pip install mlflow

from pyspark import pipelines as dp
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dp.materialized_view(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return spark.read.table("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

El ile silmeleri veya güncelleştirmeleri saklama

Lakeflow Spark Bildirimli İşlem Hatları, bir tablodan kayıtları el ile silmenize veya güncelleştirmenize ve aşağı akış tablolarını yeniden derlemek için yenileme işlemi yapmanıza olanak tanır.

varsayılan olarak, işlem hatları tablo sonuçlarını her güncelleştirildiğinde giriş verilerine göre yeniden derler, bu nedenle silinen kaydın kaynak verilerden yeniden yüklenmediğinden emin olmanız gerekir. pipelines.reset.allowed tablo özelliğinin false olarak ayarlanması, tablodaki yenilemeleri engeller, ancak tablolara artımlı yazmaların veya yeni verilerin tabloya akmasını engellemez.

Aşağıdaki diyagramda iki akış tablosunun kullanıldığı bir örnek gösterilmektedir:

  • raw_user_table bir kaynaktan ham kullanıcı verilerini alır.
  • bmi_table, raw_user_tableağırlık ve boyunu kullanarak BMI puanlarını artımlı olarak hesaplar.

raw_user_table kullanıcı kayıtlarını el ile silmek veya güncellemek ve bmi_table'i yeniden hesaplamak istiyorsunuz.

Veri diyagramını tutma

Aşağıdaki kodda, pipelines.reset.allowed tablo özelliğinin false olarak ayarlandığı ve raw_user_table için tam yenilemeyi devre dışı bırakılacak şekilde ayarlandığı gösterilmektedir; böylece hedeflenen değişiklikler zaman içinde korunur, ancak işlem hattı güncelleştirmesi çalıştırıldığında aşağı akış tabloları yeniden derlenir:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);