Lakehouse öğreticisi: Lakehouse'da verileri hazırlama ve dönüştürme

Bu öğreticide, verileri dönüştürmek ve hazırlamak için Spark çalışma zamanı ile not defterlerini kullanacaksınız.

Önkoşullar

Verileri hazırlama

Önceki öğretici adımlarından, kaynaktan lakehouse'un Dosyalar bölümüne alınan ham veriler var. Artık bu verileri dönüştürebilir ve delta tabloları oluşturmaya hazırlayabilirsiniz.

  1. Not defterlerini Lakehouse Öğreticisi Kaynak Kodu klasöründen indirin.

  2. Ekranın sol alt kısmında bulunan deneyim değiştiriciden Veri Madenciliği'ı seçin.

    Screenshot showing where to find the experience switcher and select Data Engineering.

  3. Giriş sayfasının üst kısmındaki Yeni bölümünden Not defterini içeri aktar'ı seçin.

  4. Ekranın sağ tarafında açılan İçeri aktarma durumu bölmesinden Karşıya Yükle'yi seçin.

  5. Bu bölümün 1. adımında indirilen tüm not defterlerini seçin.

    Screenshot showing where to find the downloaded notebooks and the Open button.

  6. 'ı seçin. tarayıcı penceresinin sağ üst köşesinde içeri aktarmanın durumunu belirten bir bildirim görüntülenir.

  7. İçeri aktarma işlemi başarılı olduktan sonra çalışma alanının öğeler görünümüne gidebilir ve yeni içeri aktarılan not defterlerini görebilirsiniz. Wwilakehouse lakehouse'u seçerek açın.

    Screenshot showing the list of imported notebooks and where to select the lakehouse.

  8. Wwilakehouse lakehouse açıldıktan sonra üst gezinti menüsünden Not defterini>aç Mevcut not defterini aç'ı seçin.

    Screenshot showing the list of successfully imported notebooks.

  9. Mevcut not defterleri listesinden 01 - Delta Tabloları Oluştur not defterini seçin ve Aç'ı seçin.

  10. Göl evi Gezgini'ndeki açık not defterinde, not defterinin açılmış göl kutunuza zaten bağlı olduğunu görürsünüz.

    Not

    Doku, iyileştirilmiş delta lake dosyaları yazmak için V sırası özelliği sağlar. V sırası genellikle en iyi duruma getirilmemiş Delta Lake dosyalarına göre sıkıştırmayı üç-dört kat ve performans hızlandırmasını 10 kata kadar artırır. Dokuda Spark, varsayılan 128 MB boyutuna sahip dosyalar oluştururken bölümleri dinamik olarak iyileştirir. Hedef dosya boyutu, yapılandırmalar kullanılarak iş yükü gereksinimleri başına değiştirilebilir. Yazma özelliğini iyileştirme özelliğiyle, yazılan dosya sayısını azaltan ve yazılan verilerin tek tek dosya boyutunu artırmayı hedefleyen Apache Spark altyapısı.

  11. Lakehouse'un Tablolar bölümünde delta lake tabloları olarak veri yazmadan önce, iyileştirilmiş veri yazma ve gelişmiş okuma performansı için iki Doku özelliği (V sırası ve Yazmayı İyileştir) kullanırsınız. Bu özellikleri oturumunuzda etkinleştirmek için bu yapılandırmaları not defterinizin ilk hücresinde ayarlayın.

    Not defterini başlatmak ve tüm hücreleri sırayla yürütmek için üst şeritte (Giriş altında) Tümünü çalıştır'ı seçin. Ya da yalnızca belirli bir hücreden kod yürütmek için, üzerine gelindiğinde hücrenin solunda görünen Çalıştır simgesini seçin veya denetim hücredeyken klavyenizde SHIFT + ENTER tuşlarına basın.

    Screenshot of a Spark session configuration screen, including a code cell and Run icon.

    Bir hücreyi çalıştırırken, Doku bunları Canlı Havuz aracılığıyla sağladığından temel Spark havuzunu veya küme ayrıntılarını belirtmeniz gerekmezdi. Her Doku çalışma alanı, Canlı Havuz adlı varsayılan bir Spark havuzuyla birlikte gelir. Bu, not defterleri oluşturduğunuzda Spark yapılandırmalarını veya küme ayrıntılarını belirtme konusunda endişelenmeniz gerekmeyecek anlamına gelir. İlk not defteri komutunu yürütürken, canlı havuz birkaç saniye içinde çalışır durumda olur. Spark oturumu oluşturulur ve kodu yürütmeye başlar. Spark oturumu etkinken sonraki kod yürütme işlemi bu not defterinde neredeyse anında kullanılmaktadır.

  12. Ardından, lakehouse'un Dosyalar bölümünden ham verileri okuyacak ve dönüştürmenin bir parçası olarak farklı tarih bölümleri için daha fazla sütun ekleyebilirsiniz. Son olarak, yeni oluşturulan veri bölümü sütunlarına (Yıl ve Çeyrek) göre delta tablosu olarak yazmadan önce verileri bölümlendirmek için partitionBy Spark API'sini kullanırsınız.

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. Olgu tabloları yüklendikten sonra, diğer boyutların verilerini yüklemeye geçebilirsiniz. Aşağıdaki hücre, parametre olarak geçirilen tablo adlarının her biri için lakehouse'un Dosyalar bölümünden ham verileri okumak için bir işlev oluşturur. Ardından boyut tablolarının listesini oluşturur. Son olarak, tablo listesinde döngü oluşturur ve giriş parametresinden okunan her tablo adı için bir delta tablosu oluşturur. Bu örnekte kullanılan sütun kullanılmadığından betiğin adlı Photo sütunu bırakacağını unutmayın.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. Oluşturulan tabloları doğrulamak için wwilakehouse lakehouse'da sağ tıklayın ve yenile'yi seçin. Tablolar görüntülenir.

    Screenshot showing where to find your created tables in the Lakehouse explorer.

  15. Çalışma alanının öğeler görünümüne tekrar gidin ve wwilakehouse lakehouse'u seçerek açın.

  16. Şimdi ikinci not defterini açın. Lakehouse görünümünde şeritten Not defterini>aç Varolan not defterini aç'ı seçin.

  17. Mevcut not defterleri listesinden 02 - Veri Dönüştürme - İş not defterini seçerek açın.

    Screenshot of the Open existing notebook menu, showing where to select your notebook.

  18. Göl evi Gezgini'ndeki açık not defterinde, not defterinin açılmış göl kutunuza zaten bağlı olduğunu görürsünüz.

  19. Bir kuruluşta Scala/Python ile çalışan veri mühendisleri ve SQL (Spark SQL veya T-SQL) ile çalışan diğer veri mühendisleri olabilir ve bunların hepsi aynı veri kopyası üzerinde çalışır. Doku, farklı deneyime ve tercihe sahip bu farklı grupların çalışmasını ve işbirliği yapmasına olanak tanır. İki farklı yaklaşım, iş toplamlarını dönüştürür ve oluşturur. Sizin için uygun olanı seçebilir veya performanstan ödün vermeden tercihinize göre bu yaklaşımları karıştırıp eşleştirebilirsiniz:

    • Yaklaşım 1 - İş toplamları oluşturmak üzere verileri birleştirmek ve toplamak için PySpark'ı kullanın. Bu yaklaşım, programlama (Python veya PySpark) arka planı olan biri için tercih edilir.

    • Yaklaşım 2 - Spark SQL'i kullanarak iş toplamları oluşturmak için verileri birleştirin ve toplar. Bu yaklaşım, Spark'a geçiş yaparak SQL arka planına sahip biri için tercih edilir.

  20. Yaklaşım 1 (sale_by_date_city) - İş toplamları oluşturmak için verileri birleştirmek ve toplamak için PySpark'ı kullanın. Aşağıdaki kodla, her birinde mevcut bir delta tablosuna başvuran üç farklı Spark veri çerçevesi oluşturursunuz. Ardından veri çerçevelerini kullanarak bu tabloları birleştirir, toplama oluşturmak için gruplandırır, sütunlardan birkaçını yeniden adlandırır ve son olarak verileri kalıcı hale getirmek için lakehouse'un Tablolar bölümüne delta tablosu olarak yazarsınız.

    Bu hücrede, her birinde var olan bir delta tablosuna başvuran üç farklı Spark veri çerçevesi oluşturursunuz.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    Bu hücrede, daha önce oluşturulan veri çerçevelerini kullanarak bu tabloları birleştirecek, toplama oluşturmak için gruplandıracak, sütunlardan birkaçını yeniden adlandıracak ve son olarak lakehouse'un Tablolar bölümüne delta tablosu olarak yazacaksınız.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. Yaklaşım 2 (sale_by_date_employee) - İş toplamları oluşturmak için verileri birleştirmek ve toplamak için Spark SQL'i kullanın. Aşağıdaki kodla, üç tabloyu birleştirerek geçici bir Spark görünümü oluşturur, toplama oluşturmak için gruplandırır ve sütunlardan birkaçını yeniden adlandırırsınız. Son olarak, geçici Spark görünümünden okur ve son olarak verileri kalıcı hale getirmek için lakehouse'un Tablolar bölümünde delta tablosu olarak yazarsınız.

    Bu hücrede, üç tabloyu birleştirerek geçici bir Spark görünümü oluşturursunuz, toplama oluşturmak için gruplandırma yapar ve sütunlardan birkaçını yeniden adlandırırsınız.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    Bu hücrede, önceki hücrede oluşturulan geçici Spark görünümünden okuyacak ve son olarak bunu göl binasının Tablolar bölümünde delta tablosu olarak yazacaksınız.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. Oluşturulan tabloları doğrulamak için wwilakehouse lakehouse'da sağ tıklayın ve yenile'yi seçin. Toplama tabloları görüntülenir.

    Screenshot of the Lakehouse explorer showing where the new tables appear.

Her iki yaklaşım da benzer bir sonuç üretir. Yeni bir teknoloji öğrenme veya performansı tehlikeye atma gereksinimini en aza indirmek için arka planınıza ve tercihinize göre seçim yapabilirsiniz.

Ayrıca verileri delta lake dosyaları olarak yazdığınızı fark edebilirsiniz. Doku'nun otomatik tablo bulma ve kayıt özelliği bunları alır ve meta veri deposuna kaydeder. SQL ile kullanılacak tablolar oluşturmak için deyimleri açıkça çağırmanız CREATE TABLE gerekmez.

Sonraki adım