Bagikan melalui


Tutorial: Membangun alur ETL menggunakan capture data perubahan

Pelajari cara membuat dan menyebarkan alur ETL (ekstraksi, transformasi, dan pemuatan) dengan change data capture (CDC) menggunakan Lakeflow Spark Declarative Pipelines (SDP) untuk orkestrasi data dan Auto Loader. Alur ETL menerapkan langkah-langkah untuk membaca data dari sistem sumber, mengubah data tersebut berdasarkan persyaratan, seperti pemeriksaan kualitas data dan merekam de-duplikasi, dan menulis data ke sistem target, seperti gudang data atau data lake.

Dalam tutorial ini, Anda akan menggunakan data dari customers tabel dalam database MySQL untuk:

  • Ekstrak perubahan dari database transaksi menggunakan Debezium atau alat lain dan simpan ke penyimpanan objek cloud (S3, ADLS, atau GCS). Dalam tutorial ini, Anda melewati pengaturan sistem CDC eksternal dan sebaliknya menghasilkan data palsu untuk menyederhanakan tutorial.
  • Gunakan Auto Loader untuk memuat pesan secara bertahap dari penyimpanan objek cloud, dan menyimpan pesan mentah dalam customers_cdc tabel. Auto Loader menyimpulkan skema dan menangani evolusi skema.
  • customers_cdc_clean Buat tabel untuk memeriksa kualitas data menggunakan ekspektasi. Misalnya, id seharusnya tidak boleh null karena digunakan untuk menjalankan operasi upsert.
  • Lakukan AUTO CDC ... INTO pada data CDC yang dibersihkan untuk mengupsert perubahan ke dalam tabel akhir customers.
  • Perlihatkan bagaimana alur dapat membuat tabel dimensi (SCD2) tipe 2 yang berubah perlahan untuk melacak semua perubahan.

Tujuannya adalah untuk menyerap data mentah hampir secara real time dan membangun tabel untuk tim analis Anda sambil memastikan kualitas data.

Tutorial ini menggunakan arsitektur medali Lakehouse, di mana ia menyerap data mentah melalui lapisan perunggu, membersihkan dan memvalidasi data dengan lapisan perak, dan menerapkan pemodelan dimensi dan agregasi menggunakan lapisan emas. Lihat Apa arsitektur medali lakehouse? untuk informasi selengkapnya.

Alur yang diimplementasikan terlihat seperti ini:

Pipeline dengan CDC

Untuk informasi selengkapnya tentang alur, Auto Loader, dan CDC lihat Alur Deklaratif Lakeflow Spark, Apa itu Auto Loader?, dan Apa itu pengumpulan data perubahan (CDC)?

Persyaratan

Untuk menyelesaikan tutorial ini, Anda harus memenuhi persyaratan berikut:

Mengubah pengambilan data dalam alur ETL

Penangkapan perubahan data (CDC) adalah proses yang menangkap perubahan dalam catatan yang terjadi pada database transaksional (misalnya, MySQL atau PostgreSQL) atau gudang data. CDC menangkap operasi seperti penghapusan data, penambahan, dan pembaruan, biasanya sebagai aliran untuk membentuk kembali tabel di sistem eksternal. CDC memungkinkan pemuatan inkremental sambil menghilangkan kebutuhan akan pembaruan beban massal.

Nota

Untuk menyederhanakan tutorial ini, lewati pengaturan sistem CDC eksternal. Asumsikan ia menjalankan dan menyimpan data CDC sebagai file JSON di penyimpanan objek cloud (S3, ADLS, atau GCS). Tutorial ini menggunakan Faker pustaka untuk menghasilkan data yang digunakan dalam tutorial.

Menangkap CDC

Berbagai alat CDC tersedia. Salah satu solusi sumber terbuka terkemuka adalah Debezium, tetapi implementasi lain yang menyederhanakan sumber data ada, seperti Fivetran, Qlik Replicate, StreamSets, Talend, Oracle GoldenGate, dan AWS DMS.

Dalam tutorial ini, Anda menggunakan data CDC dari sistem eksternal seperti Debezium atau DMS. Debezium menangkap setiap baris data yang diubah. Biasanya mengirimkan riwayat perubahan data ke topik Kafka atau menyimpannya sebagai file.

Anda harus memuat informasi CDC dari customers tabel (format JSON), memeriksa keakuratannya, lalu mematerialisasi tabel pelanggan di Lakehouse.

Input CDC dari Debezium

Untuk setiap perubahan, Anda menerima pesan JSON yang berisi semua bidang baris yang sedang diperbarui (id, , firstnamelastname, email, address). Pesan ini juga menyertakan metadata tambahan:

  • operation: Kode operasi, biasanya (DELETE, APPEND, UPDATE).
  • operation_date: Tanggal dan cap waktu dari catatan untuk setiap aksi operasi.

Alat seperti Debezium dapat menghasilkan output yang lebih canggih, seperti nilai baris sebelum perubahan, tetapi tutorial ini menghilangkannya untuk kesederhanaan.

Langkah 1: Membuat alur

Buat alur ETL baru untuk mengkueri sumber data CDC Anda dan membuat tabel di ruang kerja Anda.

  1. Di ruang kerja Anda, klik ikon Plus. Baru di sudut kiri atas.

  2. Klik Pipeline ETL.

  3. Ubah judul alur menjadi Pipelines with CDC tutorial atau nama yang Anda inginkan.

  4. Di bawah judul, pilih katalog dan skema di mana Anda memiliki izin menulis.

    Katalog dan skema ini digunakan secara default, jika Anda tidak menentukan katalog atau skema dalam kode Anda. Kode Anda dapat menulis ke katalog atau skema apa pun dengan menentukan jalur lengkap. Tutorial ini menggunakan default yang Anda tentukan di sini.

  5. Dari Opsi tingkat lanjut, pilih Mulai dengan file kosong.

  6. Pilih folder untuk kode Anda. Anda dapat memilih Telusuri untuk menelusuri daftar folder di ruang kerja. Anda dapat memilih folder apa pun yang memiliki izin tulis.

    Untuk menggunakan kontrol versi, pilih folder Git. Jika Anda perlu membuat folder baru, pilih tombol Plus.

  7. Pilih Python atau SQL untuk bahasa file Anda, berdasarkan bahasa yang ingin Anda gunakan untuk tutorial.

  8. Klik Pilih untuk membuat alur dengan pengaturan ini dan buka Editor Alur Lakeflow.

Anda sekarang memiliki jalur tanpa isi dengan katalog dan skema default. Selanjutnya, siapkan data sampel untuk diimpor dalam tutorial.

Langkah 2: Buat data sampel untuk diimpor dalam tutorial ini

Langkah ini tidak diperlukan jika Anda mengimpor data Anda sendiri dari sumber yang ada. Untuk tutorial ini, hasilkan data palsu sebagai contoh untuk tutorial. Buat buku catatan untuk menjalankan skrip pembuatan data Python. Kode ini hanya perlu dijalankan sekali untuk menghasilkan data sampel, jadi buat dalam folder alur explorations , yang tidak dijalankan sebagai bagian dari pembaruan alur.

Nota

Kode ini menggunakan Faker untuk menghasilkan data CDC sampel. Faker tersedia untuk diinstal secara otomatis, sehingga tutorial menggunakan %pip install faker. Anda juga dapat mengatur ketergantungan pada faker untuk notebook. Lihat Menambahkan dependensi ke buku catatan.

  1. Dari dalam Editor Alur Lakeflow, di bar samping browser aset di sebelah kiri editor, klik ikon Plus.Tambahkan, lalu pilih Eksplorasi.

  2. Beri Nama, seperti Setup data, pilih Python. Anda dapat meninggalkan folder tujuan default, yang merupakan folder baru explorations .

  3. Klik Buat. Ini membuat buku catatan di folder baru.

  4. Masukkan kode berikut di sel pertama. Anda harus mengubah definisi <my_catalog> dan <my_schema> agar sesuai dengan katalog dan skema default yang Anda pilih di prosedur sebelumnya:

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. Untuk menghasilkan himpunan data yang digunakan dalam tutorial, ketik Shift + Enter untuk menjalankan kode:

  6. Optional. Untuk mempratinjau data yang digunakan dalam tutorial ini, masukkan kode berikut di sel berikutnya dan jalankan kode. Perbarui katalog dan skema agar sesuai dengan jalur dari kode sebelumnya.

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

Ini menghasilkan himpunan data besar (dengan data CDC palsu) yang dapat Anda gunakan di sisa tutorial. Pada langkah berikutnya, serap data menggunakan Auto Loader.

Langkah 3: Menyerap data secara bertahap dengan Auto Loader

Langkah selanjutnya adalah menyerap data mentah dari penyimpanan cloud (palsu) ke dalam lapisan perunggu.

Ini bisa menjadi tantangan karena beberapa alasan, karena Anda harus:

  • Beroperasi dalam skala besar, berpotensi menyerap jutaan file kecil.
  • Menentukan skema dan jenis JSON.
  • Tangani rekaman buruk dengan skema JSON yang tidak benar.
  • Mengurus evolusi skema (misalnya, kolom baru di tabel pelanggan).

Auto Loader menyederhanakan penyerapan ini, termasuk inferensi skema dan evolusi skema, sambil menskalakan ke jutaan file masuk. Auto Loader tersedia di Python menggunakan cloudFiles dan di SQL menggunakan SELECT * FROM STREAM read_files(...) dan dapat digunakan dengan berbagai format (JSON, CSV, Apache Avro, dll.):

Menentukan tabel sebagai tabel streaming menjamin bahwa Anda hanya menggunakan data masuk baru. Jika Anda tidak mendefinisikannya sebagai tabel streaming, ia memindai dan menyerap semua data yang tersedia. Lihat Tabel streaming untuk informasi selengkapnya.

  1. Untuk memuat data CDC yang masuk menggunakan Auto Loader, salin dan tempel kode berikut ke dalam file kode yang dibuat dengan pipeline Anda (disebut my_transformation.py). Anda dapat menggunakan Python atau SQL, berdasarkan bahasa yang Anda pilih saat membuat alur. Pastikan untuk mengganti <catalog> dan <schema> dengan yang telah Anda tetapkan sebagai default untuk alur pekerjaan.

    Phyton

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. Klik ikon Putar.Jalankan file atau Jalankan alur untuk memulai pembaruan untuk alur yang tersambung. Dengan hanya satu file sumber dalam alur Anda, ini setara secara fungsional.

Setelah pembaruan selesai, editor diperbarui dengan informasi tentang pipa Anda.

  • Grafik alur (DAG), di bar samping di sebelah kanan kode Anda, memperlihatkan satu tabel, customers_cdc_bronze.
  • Ringkasan pembaruan ditampilkan di bagian atas penjelajah sumber daya jalur.
  • Detail tabel yang dihasilkan diperlihatkan di panel bawah, dan Anda bisa menelusuri data dari tabel dengan memilihnya.

Ini adalah data lapisan perunggu mentah yang diimpor dari penyimpanan cloud. Pada langkah berikutnya, bersihkan data untuk membuat tabel lapisan perak.

Langkah 4: Pembersihan dan harapan untuk melacak kualitas data

Setelah lapisan perunggu ditentukan, buat lapisan perak dengan menambahkan harapan untuk mengontrol kualitas data. Periksa kondisi berikut:

  • ID tidak boleh berupa null.
  • Jenis operasi CDC harus valid.
  • JSON harus dibaca dengan benar oleh Auto Loader.

Baris yang tidak memenuhi kondisi ini dihapus.

Lihat Mengelola kualitas data dengan ekspektasi alur untuk informasi selengkapnya.

  1. Dari panel samping penjelajah aset pipeline, klik ikon Plus.Tambah, kemudian Transformasi.

  2. Masukkan Nama dan pilih bahasa (Python atau SQL) untuk file kode sumber. Anda dapat mencampur dan mencocokkan bahasa dalam alur, sehingga Anda dapat memilih salah satu untuk langkah ini.

  3. Untuk membuat lapisan perak dengan tabel yang dibersihkan dan memberlakukan batasan, salin dan tempel kode berikut ke dalam file baru (pilih Python atau SQL berdasarkan bahasa file).

    Phyton

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. Klik ikon Putar.Jalankan file atau Jalankan alur untuk memulai pembaruan untuk alur yang tersambung.

    Karena sekarang ada dua file sumber, ini tidak melakukan hal yang sama, tetapi dalam hal ini, outputnya sama.

    • Jalankan alur menjalankan seluruh alur Anda, termasuk kode dari langkah 3. Jika data input Anda sedang diperbarui, ini dapat menarik perubahan apa pun dari sumber tersebut ke lapisan perunggu Anda. Ini tidak menjalankan kode dari langkah penyiapan data, karena itu ada di folder eksplorasi, dan bukan bagian dari sumber untuk alur Anda.
    • Jalankan file hanya menjalankan file sumber saat ini. Dalam hal ini, tanpa data input Anda diperbarui, ini menghasilkan data perak dari tabel perunggu yang di-cache. Akan berguna untuk menjalankan hanya file ini untuk iterasi yang lebih cepat saat membuat atau mengedit kode alur Anda.

Ketika pembaruan selesai, Anda dapat melihat bahwa grafik alur sekarang menunjukkan dua tabel (dengan lapisan perak tergantung pada lapisan perunggu), dan panel bawah menunjukkan detail untuk kedua tabel. Bagian atas penjelajah aset alur kerja sekarang menunjukkan waktu eksekusi untuk beberapa kali pengulangan, tetapi hanya menampilkan detail untuk eksekusi terbaru.

Selanjutnya, buat versi akhir dari lapisan emas tabel customers Anda.

Langkah 5: Mematerialisasi tabel pelanggan dengan alur CDC otomatis

Hingga saat ini, tabel hanya meneruskan data CDC di setiap langkah. Sekarang, buat tabel customers yang berisi tampilan paling mutakhir dan menjadi replika tabel asli, bukan daftar operasi CDC yang digunakan untuk membuatnya.

Ini tidak mudah untuk diterapkan secara manual. Anda harus mempertimbangkan hal-hal seperti deduplikasi data untuk menyimpan data terbaru.

Namun, Lakeflow Spark Declarative Pipelines memecahkan tantangan ini dengan operasi AUTO CDC.

  1. Dari bilah sisi penjelajah aset pipeline, klik ikon Tambah.Tambahkan dan Transformasi.

  2. Masukkan Nama dan pilih bahasa (Python atau SQL) untuk file kode sumber baru. Anda dapat kembali memilih salah satu bahasa untuk langkah ini, tetapi menggunakan kode yang benar, di bawah ini.

  3. Untuk memproses data CDC menggunakan AUTO CDC di Lakeflow Spark Declarative Pipelines, salin dan tempel kode berikut ke dalam file baru.

    Phyton

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. Klik ikon Putar.Jalankan file untuk memulai pembaruan untuk alur yang tersambung.

Ketika pembaruan selesai, Anda dapat melihat bahwa grafik alur Anda menunjukkan 3 tabel, berkembang dari perunggu ke perak ke emas.

Langkah 6: Lacak riwayat pembaruan dengan dimensi tipe 2 (SCD2) yang berubah perlahan

Sering kali diperlukan untuk membuat tabel yang melacak semua perubahan yang dihasilkan dari APPEND, , UPDATEdan DELETE:

  • Riwayat: Anda ingin menyimpan riwayat semua perubahan pada tabel Anda.
  • Keterlacakan: Anda ingin melihat operasi mana yang terjadi.

SCD2 dengan Lakeflow SDP

Delta mendukung perubahan aliran data (CDF), dan table_change dapat mengkueri modifikasi tabel di SQL dan Python. Namun, kasus penggunaan utama CDF adalah menangkap perubahan dalam alur, bukan untuk membuat tampilan penuh perubahan tabel dari awal.

Hal-hal menjadi sangat kompleks untuk diterapkan jika Anda memiliki peristiwa yang tidak berurutan. Jika Anda harus mengurutkan perubahan dengan tanda waktu dan menerima modifikasi yang terjadi di masa lalu, Anda harus menambahkan entri baru dalam tabel SCD Anda dan memperbarui entri sebelumnya.

Lakeflow SDP menghapus kompleksitas ini dan memungkinkan Anda membuat tabel terpisah yang berisi semua modifikasi dari awal waktu. Tabel ini kemudian dapat digunakan dalam skala besar, dengan partisi tertentu atau kolom ZORDER jika diperlukan. Bidang yang tidak berurutan ditangani secara otomatis berdasarkan _sequence_by.

Untuk membuat tabel SCD2, gunakan opsi STORED AS SCD TYPE 2 di SQL atau stored_as_scd_type="2" di Python.

Nota

Anda juga dapat membatasi kolom mana yang dilacak fitur menggunakan opsi : TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. Dari bilah sisi penjelajah aset pipeline, klik ikon Tambah.Tambahkan dan Transformasi.

  2. Masukkan Nama dan pilih bahasa (Python atau SQL) untuk file kode sumber baru.

  3. Salin dan tempel kode berikut ke dalam file baru.

    Phyton

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. Klik ikon Putar.Jalankan file untuk memulai pembaruan untuk alur yang tersambung.

Ketika pembaruan selesai, grafik alur menyertakan tabel baru customers_history, yang juga bergantung pada tabel lapisan perak, dan panel bawah menunjukkan detail untuk semua 4 tabel.

Langkah 7: Buat tampilan materialisasi yang melacak siapa yang paling sering mengubah informasi mereka.

Tabel customers_history berisi semua perubahan historis yang telah dilakukan pengguna pada informasi mereka. Buat tampilan materialisasi sederhana di lapisan emas yang melacak siapa yang paling banyak mengubah informasi mereka. Ini dapat digunakan untuk analisis deteksi penipuan atau rekomendasi pengguna dalam skenario dunia nyata. Selain itu, menerapkan perubahan dengan SCD2 telah menghapus duplikat, sehingga Anda dapat langsung menghitung baris per ID pengguna.

  1. Dari bilah sisi penjelajah aset pipeline, klik ikon Tambah.Tambahkan dan Transformasi.

  2. Masukkan Nama dan pilih bahasa (Python atau SQL) untuk file kode sumber baru.

  3. Salin dan tempel kode berikut ke dalam file sumber baru.

    Phyton

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. Klik ikon Putar.Jalankan file untuk memulai pembaruan untuk alur yang tersambung.

Setelah pembaruan selesai, ada tabel baru dalam grafik alur yang bergantung pada customers_history tabel, dan Anda dapat melihatnya di panel bawah. Pipeline Anda sekarang telah selesai. Anda dapat mengujinya dengan melakukan Jalankan pipeline penuh. Satu-satunya langkah yang tersisa adalah menjadwalkan alur untuk diperbarui secara teratur.

Langkah 8: Buat pekerjaan untuk menjalankan alur ETL

Selanjutnya, buat alur kerja untuk mengotomatiskan langkah-langkah penyerapan, pemrosesan, dan analisis data di alur Anda menggunakan pekerjaan Databricks.

  1. Di bagian atas editor, pilih tombol Jadwalkan .
  2. Jika dialog Jadwal muncul, pilih Tambahkan jadwal.
  3. Ini membuka dialog Jadwal baru , di mana Anda dapat membuat pekerjaan untuk menjalankan alur Anda sesuai jadwal.
  4. Secara opsional, beri nama pekerjaan.
  5. Secara default, jadwal diatur untuk berjalan sekali per hari. Anda dapat menerima default ini, atau mengatur jadwal Anda sendiri. Memilih Tingkat Lanjut memberi Anda opsi untuk mengatur waktu tertentu yang akan dijalankan pekerjaan. Memilih Opsi lainnya memungkinkan Anda membuat pemberitahuan saat pekerjaan berjalan.
  6. Pilih Buat untuk menerapkan perubahan dan membuat pekerjaan.

Sekarang pekerjaan akan berjalan setiap hari untuk menjaga alur Anda tetap terbarui. Anda dapat memilih Jadwalkan lagi untuk melihat daftar jadwal. Anda dapat mengelola jadwal untuk alur Anda dari dialog tersebut, termasuk menambahkan, mengedit, atau menghapus jadwal.

Mengklik nama jadwal (atau pekerjaan) akan membawa Anda ke halaman pekerjaan di daftar Pekerjaan & alur . Dari sana Anda dapat melihat detail tentang eksekusi pekerjaan, termasuk riwayat eksekusi, atau segera menjalankan pekerjaan dengan tombol Jalankan sekarang .

Lihat Pemantauan dan observabilitas untuk Pekerjaan Lakeflow untuk informasi selengkapnya tentang pelaksanaan pekerjaan.

Sumber daya tambahan