Bagikan melalui


Tutorial Lakehouse: Menyiapkan dan mengubah data di lakehouse

Dalam tutorial ini, Anda menggunakan notebook dengan runtime Spark untuk mengubah dan menyiapkan data mentah di lakehouse Anda.

Prasyarat

Jika Anda tidak memiliki lakehouse yang berisi data, Anda harus:

Siapkan data

Dari langkah-langkah tutorial sebelumnya, kami memiliki data mentah yang diserap dari sumber ke bagian File lakehouse. Sekarang Anda dapat mengubah data tersebut dan menyiapkannya untuk membuat tabel Delta.

  1. Unduh buku catatan dari folder Kode Sumber Tutorial Lakehouse.

  2. Dari pengalih yang terletak di kiri bawah layar, pilih Rekayasa Data.

    Cuplikan layar memperlihatkan tempat menemukan pengalih dan memilih Rekayasa Data.

  3. Pilih Impor buku catatan dari bagian Baru di bagian atas halaman arahan.

  4. Pilih Unggah dari panel Status impor yang terbuka di sisi kanan layar.

  5. Pilih semua buku catatan yang Anda unduh di langkah pertama bagian ini.

    Cuplikan layar memperlihatkan tempat menemukan buku catatan yang diunduh dan tombol Buka.

  6. Pilih Buka. Pemberitahuan yang menunjukkan status impor muncul di sudut kanan atas jendela browser.

  7. Setelah impor berhasil, buka tampilan item ruang kerja dan lihat buku catatan yang baru diimpor. Pilih wwilakehouse lakehouse untuk membukanya.

    Cuplikan layar memperlihatkan daftar buku catatan yang diimpor dan tempat memilih lakehouse.

  8. Setelah wwilakehouse lakehouse dibuka, pilih Buka buku catatan buku catatan> yang sudah ada dari menu navigasi atas.

    Cuplikan layar memperlihatkan daftar buku catatan yang berhasil diimpor.

  9. Dari daftar buku catatan yang sudah ada, pilih buku catatan 01 - Buat Tabel Delta dan pilih Buka.

  10. Di buku catatan terbuka di Lakehouse Explorer, Anda melihat buku catatan sudah ditautkan ke lakehouse yang dibuka.

    Catatan

    Fabric menyediakan kemampuan urutan V untuk menulis file danau Delta yang dioptimalkan. Urutan V sering meningkatkan kompresi tiga hingga empat kali, dan hingga 10 kali, akselerasi performa melalui file Delta Lake yang tidak dioptimalkan. Spark in Fabric secara dinamis mengoptimalkan partisi sambil menghasilkan file dengan ukuran default 128 MB. Ukuran file target dapat diubah per persyaratan beban kerja menggunakan konfigurasi.

    Dengan kemampuan tulis optimalkan, mesin Apache Spark mengurangi jumlah file yang ditulis dan bertujuan untuk meningkatkan ukuran file individual dari data tertulis.

  11. Sebelum Anda menulis data sebagai tabel delta lake di bagian Tabel lakehouse, Anda menggunakan dua fitur Fabric (Urutan V dan Optimalkan Tulis) untuk penulisan data yang dioptimalkan dan untuk meningkatkan performa membaca. Untuk mengaktifkan fitur ini dalam sesi Anda, atur konfigurasi ini di sel pertama buku catatan Anda.

    Untuk memulai buku catatan dan menjalankan semua sel secara berurutan, pilih Jalankan semua di pita atas (di bawah Beranda). Atau, untuk hanya menjalankan kode dari sel tertentu, pilih ikon Jalankan yang muncul di sebelah kiri sel saat melayang, atau tekan SHIFT + ENTER di keyboard Anda saat kontrol berada di sel.

    Cuplikan layar konfigurasi sesi Spark, termasuk sel kode dan ikon Jalankan.

    Saat menjalankan sel, Anda tidak perlu menentukan kumpulan Spark atau detail kluster yang mendasar karena Fabric menyediakannya melalui Live Pool. Setiap ruang kerja Fabric dilengkapi dengan kumpulan Spark default, yang disebut Live Pool. Ini berarti saat Membuat notebook, Anda tidak perlu khawatir menentukan konfigurasi Spark atau detail kluster apa pun. Saat Anda menjalankan perintah notebook pertama, kumpulan langsung aktif dan berjalan dalam beberapa detik. Dan sesi Spark dibuat dan mulai mengeksekusi kode. Eksekusi kode berikutnya hampir seketika di notebook ini saat sesi Spark aktif.

  12. Selanjutnya, Anda membaca data mentah dari bagian File di lakehouse, dan menambahkan lebih banyak kolom untuk bagian tanggal yang berbeda sebagai bagian dari transformasi. Terakhir, Anda menggunakan partisi By Spark API untuk mempartisi data sebelum menulisnya sebagai format tabel Delta berdasarkan kolom bagian data yang baru dibuat (Tahun dan Kuartal).

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. Setelah tabel fakta dimuat, Anda dapat melanjutkan ke memuat data untuk dimensi lainnya. Sel berikut membuat fungsi untuk membaca data mentah dari bagian File lakehouse untuk setiap nama tabel yang diteruskan sebagai parameter. Selanjutnya, ini membuat daftar tabel dimensi. Terakhir, tabel mengulangi daftar tabel dan membuat tabel Delta untuk setiap nama tabel yang dibaca dari parameter input. Perhatikan bahwa skrip menghilangkan kolom bernama Photo dalam contoh ini karena kolom tidak digunakan.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. Untuk memvalidasi tabel yang dibuat, klik kanan dan pilih refresh di lakehouse wwilakehouse . Tabel muncul.

    Cuplikan layar memperlihatkan tempat menemukan tabel yang Anda buat di penjelajah Lakehouse.

  15. Buka tampilan item ruang kerja lagi dan pilih danau wwilakehouse untuk membukanya.

  16. Sekarang, buka buku catatan kedua. Di tampilan lakehouse, pilih Buka buku catatan> yang Sudah Ada dari pita.

  17. Dari daftar buku catatan yang sudah ada, pilih buku catatan 02 - Transformasi Data - Bisnis untuk membukanya.

    Cuplikan layar menu Buka buku catatan yang sudah ada, memperlihatkan tempat untuk memilih buku catatan Anda.

  18. Di buku catatan terbuka di Lakehouse Explorer, Anda melihat buku catatan sudah ditautkan ke lakehouse yang dibuka.

  19. Organisasi mungkin memiliki teknisi data yang bekerja dengan Scala/Python dan teknisi data lainnya yang bekerja dengan SQL (Spark SQL atau T-SQL), semuanya mengerjakan salinan data yang sama. Fabric memungkinkan grup yang berbeda ini, dengan pengalaman dan preferensi yang bervariasi, untuk bekerja dan berkolaborasi. Dua pendekatan yang berbeda mengubah dan menghasilkan agregat bisnis. Anda dapat memilih yang cocok untuk Anda atau mencampur dan mencocokkan pendekatan ini berdasarkan preferensi Anda tanpa mengorbankan performa:

    • Pendekatan #1 - Gunakan PySpark untuk menggabungkan dan menggabungkan data untuk menghasilkan agregat bisnis. Pendekatan ini lebih disukai seseorang dengan latar belakang pemrograman (Python atau PySpark).

    • Pendekatan #2 - Gunakan Spark SQL untuk menggabungkan dan menggabungkan data untuk menghasilkan agregat bisnis. Pendekatan ini lebih disukai seseorang dengan latar belakang SQL, beralih ke Spark.

  20. Pendekatan #1 (sale_by_date_city) - Gunakan PySpark untuk menggabungkan dan menggabungkan data untuk menghasilkan agregat bisnis. Dengan kode berikut, Anda membuat tiga kerangka data Spark yang berbeda, masing-masing mereferensikan tabel Delta yang ada. Kemudian Anda menggabungkan tabel ini menggunakan kerangka data, lakukan kelompokkan menurut untuk menghasilkan agregasi, mengganti nama beberapa kolom, dan akhirnya menulisnya sebagai tabel Delta di bagian Tabel lakehouse untuk bertahan dengan data.

    Dalam sel ini, Anda membuat tiga kerangka data Spark yang berbeda, masing-masing mereferensikan tabel Delta yang ada.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    Dalam sel ini, Anda menggabungkan tabel ini menggunakan kerangka data yang dibuat sebelumnya, melakukan kelompokkan menurut untuk menghasilkan agregasi, mengganti nama beberapa kolom, dan akhirnya menulisnya sebagai tabel Delta di bagian Tabel lakehouse.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. Pendekatan #2 (sale_by_date_employee) - Gunakan Spark SQL untuk bergabung dan menggabungkan data untuk menghasilkan agregat bisnis. Dengan kode berikut, Anda membuat tampilan Spark sementara dengan menggabungkan tiga tabel, mengelompokkan menurut untuk menghasilkan agregasi, dan mengganti nama beberapa kolom. Terakhir, Anda membaca dari tampilan Spark sementara dan akhirnya menulisnya sebagai tabel Delta di bagian Tabel lakehouse untuk bertahan dengan data.

    Dalam sel ini, Anda membuat tampilan Spark sementara dengan menggabungkan tiga tabel, mengelompokkan menurut untuk menghasilkan agregasi, dan mengganti nama beberapa kolom.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    Dalam sel ini, Anda membaca dari tampilan Spark sementara yang dibuat di sel sebelumnya dan akhirnya menulisnya sebagai tabel Delta di bagian Tabel lakehouse.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. Untuk memvalidasi tabel yang dibuat, klik kanan dan pilih Refresh di lakehouse wwilakehouse . Tabel agregat muncul.

    Cuplikan layar penjelajah Lakehouse memperlihatkan tempat tabel baru muncul.

Kedua pendekatan tersebut menghasilkan hasil yang sama. Untuk meminimalkan kebutuhan Anda untuk mempelajari teknologi baru atau kompromi pada performa, pilih pendekatan yang paling sesuai dengan latar belakang dan preferensi Anda.

Anda mungkin melihat bahwa Anda menulis data sebagai file Delta lake. Fitur penemuan tabel otomatis dan pendaftaran Fabric mengambil dan mendaftarkannya di metastore. Anda tidak perlu secara eksplisit memanggil CREATE TABLE pernyataan untuk membuat tabel yang akan digunakan dengan SQL.

Langkah selanjutnya