Lakehouse öğreticisi: Lakehouse'da verileri hazırlama ve dönüştürme

Bu öğreticide, lakehouse'unuzda ham verileri dönüştürmek ve hazırlamak için Spark çalışma zamanı ile not defterlerini kullanacaksınız.

Önkoşullar

Başlamadan önce, bu serideki önceki eğitimleri tamamlamanız gerekir:

  1. Göl evi oluşturma
  2. Lakehouse'a veri yükleme
  3. Lakehouse şemaları lakehouse'ınızda etkinleştirilmiş olduğundan emin olun.

Verileri hazırlama

Önceki öğretici adımlardan, kaynaktan lakehouse'un Dosyalar bölümüne alınan ham verileriniz bulunmaktadır. Artık bu verileri dönüştürebilir ve Delta tabloları oluşturmaya hazırlayabilirsiniz.

  1. Not defterlerini Lakehouse Öğreticisi Kaynak Kodu klasöründen indirin.

  2. Tarayıcınızda, Fabric portalında Fabric çalışma alanınıza gidin.

  3. Bu bilgisayardan Not Defterini>İçeri> seçin.

    Doku portalında not defterini içeri aktarma seçeneğini gösteren ekran görüntüsü.

  4. Ekranın sağ tarafında açılan İçe Aktarma durumu bölmesinden karşıya yükle'yi seçin.

  5. Yalnızca tercih ettiğiniz kodlama diliyle eşleşen not defterini seçin.

    • PySpark (Prepare and transform data - PySpark.ipynb)
    • Spark SQL (Prepare and transform data - Spark SQL.ipynb)
  6. 'ı seçin. tarayıcı penceresinin sağ üst köşesinde içeri aktarmanın durumunu belirten bir bildirim görüntülenir.

  7. İçeri aktarma işlemi başarılı olduktan sonra, içeri aktarılan not defterini doğrulamak için çalışma alanının öğeler görünümüne gidin.

    İçeri aktarılan not defterlerinin listesini ve lakehouse'un nerede seçileceği gösteren ekran görüntüsü.

  8. Wwilakehouse lakehouse'u seçerek açın; böylece bir sonraki açışınızda açtığınız not defteri buna bağlanır.

  9. Üst gezinti menüsünden Not defterini aç> aç'ı seçin.

    Başarıyla içeri aktarılan not defterlerinin listesini gösteren ekran görüntüsü.

  10. PySpark veya Spark SQL için içeri aktarılan not defterinizi seçin ve Aç'ı seçin. Not defteri, göl evi Gezgini'nde gösterildiği gibi açılmış göl evinizle zaten bağlantılıdır.

Artık Delta tablolarınızı oluşturan ve dönüştüren not defteri hücrelerini çalıştırmaya hazırsınız.

Aşağıdaki bölümlerde not defteri hücrelerini sıralı olarak çalıştırın. Bir hücreyi yürütmek için, üzerine gelindiğinde hücrenin solunda görünen Çalıştır simgesini seçin. Tüm hücreleri sırayla çalıştırmak için üst şeritte (Giriş) Tümünü çalıştır'ı da seçebilirsiniz.

Önemli

Bu kılavuz, lakehouse şemalarının etkinleştirilmesini gerektirir. Şemalar etkinleştirilmediyse, bu öğreticideki kod amaçlandığı gibi çalışmaz.

İçeri aktarılan not defterinde hem Yol 1 hem de Yol 2 bölümlerini görürsünüz. Bu öğretici kapsamında Yol 1'i (lakehouse şemaları etkin) kullanın ve Yol 2'yi yoksayın (lakehouse şemaları etkin değil).

Delta tabloları oluşturma

Bu bölümde, not defteri hücrelerini çalıştırarak ham verilerden Delta tabloları oluşturacaksınız.

Tablolar, analiz verilerini düzenlemeye yönelik yaygın bir desen olan yıldız şemasını izler:

  • Olgu tablosu (fact_sale), işletmenin ölçülebilir olaylarını (bu örnekte miktarlar, fiyatlar ve kar içeren bireysel satış işlemleri) içerir.
  • Boyut tabloları (dimension_city, dimension_customer, dimension_date, , dimension_employee), dimension_stock_itembir satışın nerede gerçekleştiği, kimin yaptığı ve ne zaman olduğu gibi olgulara bağlam sağlayan açıklayıcı öznitelikleri içerir.

Bu öğretici sayfasında, içeri aktardığınız not defteriyle eşleşen sekmeyi seçin ve tüm adımlar için aynı sekmeyi kullanmaya devam edin. Makalede sekmeler var, not defterinde değil.

  1. Hücre 1 - Spark oturumu ayarları. Bu hücre, sonraki hücrelerde verilerin nasıl yazıldığını ve okunduğunu optimize eden iki Fabric özelliğini etkin hale getirir. V-order, daha hızlı okuma ve daha iyi sıkıştırma için Parquet dosya düzenini iyileştirir. Yazmayı iyileştirme , yazılan dosya sayısını azaltır ve tek tek dosya boyutunu artırır.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
    
  2. Hücre 2 - Olgu - Satış. Bu hücre, Files/wwi-raw-data/full/fact_sale_1y_full ham parquet verilerini okur, tarih bölümü sütunlarını (Yıl, Çeyrek ve Ay) ekler ve fact_sale, Yıl ve Çeyrek ile bölümlenmiş bir Delta tablosu olarak yazar.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/dbo/" + table_name)
    
  3. Hücre 3 - Boyutlar. Bu hücre, beş boyutlu parquet veri kümelerini okur ve Delta tabloları (dimension_city, dimension_customer, dimension_date, dimension_employee ve dimension_stock_item) olarak Tables/dbo/... altında yazar.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    def loadFullDataFromSource(table_name):
       df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
       df = df.drop("Photo")
       df.write.mode("overwrite").format("delta").save("Tables/dbo/" + table_name)
    
    full_tables = [
       'dimension_city',
       'dimension_customer',
       'dimension_date',
       'dimension_employee',
       'dimension_stock_item'
    ]
    
    for table in full_tables:
       loadFullDataFromSource(table)
    
  4. Oluşturulan tabloları doğrulamak için gezginde wwilakehouse lakehouse'a sağ tıklayın ve yenile'yi seçin. Tablolar görüntülenir.

    Lakehouse gezgininde oluşturduğunuz tabloların nerede bulunduğunu gösteren ekran görüntüsü.

İş verisi toplamaları veya raporlamaları için verileri dönüştürme

Bu bölümde, aynı not defterinde devam eder ve sonraki hücreleri çalıştırarak önceki bölümde oluşturduğunuz Delta tablolarından toplama tabloları oluşturursunuz.

  1. Not defterinin wwilakehouse'a bağlı olduğundan emin olun.

  2. Hücre 4 - Dönüştürme için kaynak tabloları yükleyin (yalnızca PySpark). PySpark not defterini kullanıyorsanız, aşağıdaki toplama adımları için Delta tablolarını DataFrame'lere yüklemek için bu hücreyi çalıştırın.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    df_fact_sale = spark.read.format("delta").load("Tables/dbo/fact_sale")
    df_dimension_date = spark.read.format("delta").load("Tables/dbo/dimension_date")
    df_dimension_city = spark.read.format("delta").load("Tables/dbo/dimension_city")
    
  3. Hücre 5 - aggregate_sale_by_date_city'ı oluşturun. Bu hücre satış, tarih ve şehir verilerini birleştirir, ardından şehir düzeyinde toplama tablosunu oluşturur.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    sale_by_date_city = (
       df_fact_sale.alias("sale")
       .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner")
       .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner")
       .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")
       .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")
       .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")
       .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")
       .withColumnRenamed("sum(Profit)", "SumOfProfit")
       .orderBy("date.Date", "city.StateProvince", "city.City")
    )
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_city")
    
  4. Hücre 6 - Oluştur aggregate_sale_by_date_employee. Bu hücre satış, tarih ve çalışan verilerini birleştirir, ardından çalışan düzeyinde toplama tablosunu oluşturur.

    Bu hücreyi çalıştırın ve sonraki adıma geçmeden önce bitmesini bekleyin.

    spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
               DD.Date, DD.CalendarMonthLabel
            , DD.Day, DD.ShortMonth Month, CalendarYear Year
            , DE.PreferredName, DE.Employee
            , SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
            , SUM(FS.TaxAmount) SumOfTaxAmount
            , SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
            , SUM(FS.Profit) SumOfProfit
    FROM delta.`Tables/dbo/fact_sale` FS
    INNER JOIN delta.`Tables/dbo/dimension_date` DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN delta.`Tables/dbo/dimension_employee` DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    """)
    
    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_employee")
    
  5. Oluşturulan tabloları doğrulamak için gezginde wwilakehouse lakehouse'a sağ tıklayın ve yenile'yi seçin. Toplama tabloları görüntülenir.

    Yeni tabloların nerede göründüğünü gösteren Lakehouse gezgininin ekran görüntüsü.

Bu öğreticide veriler Delta lake dosyaları olarak yazılır. Fabric bu tabloları otomatik olarak bulur ve meta veri deposuna kaydeder, bu nedenle ayrı CREATE TABLE ifadeler çalıştırmanız gerekmez.

Sonraki adım