Bagikan melalui


Mengisi ulang data historis dengan alur

Dalam rekayasa data, pengisian ulang mengacu pada proses pemrosesan data historis secara retroaktif melalui alur data yang dirancang untuk memproses data saat ini atau streaming.

Biasanya, ini adalah alur terpisah yang mengirim data ke dalam tabel yang ada. Ilustrasi berikut menunjukkan aliran pengisian ulang yang mengirim data historis ke tabel perunggu dalam pipa saluran Anda.

Aliran isi ulang menambahkan data historis ke alur kerja yang sudah ada

Beberapa skenario yang mungkin memerlukan isi ulang:

  • Proses data historis dari sistem warisan untuk melatih model pembelajaran mesin (ML) atau membangun dasbor analisis tren historis.
  • Memproses ulang subkumpulan data karena masalah kualitas data dengan sumber data hulu.
  • Persyaratan bisnis Anda berubah dan Anda perlu mengisi ulang data untuk periode waktu yang berbeda yang tidak tercakup oleh alur awal.
  • Logika bisnis Anda berubah dan Anda perlu memproses ulang data historis dan saat ini.

Isi ulang di Lakeflow Spark Declarative Pipelines didukung dengan alur tambahan khusus yang menggunakan ONCE opsi . Lihat append_flow atau CREATE FLOW (pipelines) untuk informasi selengkapnya tentang opsi tersebut ONCE .

Pertimbangan saat mengisi ulang data historis ke dalam tabel streaming

  • Biasanya, tambahkan data ke tabel streaming perunggu. Lapisan perak dan emas hilir akan mengambil data baru dari lapisan perunggu.
  • Pastikan alur Anda dapat menangani data duplikat dengan baik jika data yang sama ditambahkan beberapa kali.
  • Pastikan skema data historis kompatibel dengan skema data saat ini.
  • Pertimbangkan ukuran volume data dan waktu pemrosesan SLA yang diperlukan, dan dengan demikian mengonfigurasi ukuran kluster dan batch.

Contoh: Menambahkan isi ulang ke alur yang sudah ada

Dalam contoh ini, katakanlah Anda memiliki alur yang menyerap data pendaftaran peristiwa mentah dari sumber penyimpanan cloud, mulai 01 Jan 2025. Anda kemudian menyadari bahwa Anda ingin menambahkan data historis tiga tahun sebelumnya untuk keperluan pelaporan dan analisis data hilir. Semua data berada dalam satu lokasi, dipartisi menurut tahun, bulan, dan hari, dalam format JSON.

Alur awal

Berikut adalah kode alur awal yang secara bertahap menyerap data pendaftaran peristiwa mentah dari penyimpanan cloud.

Phyton

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dp.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
    )

SQL

-- create a streaming table and the default flow to ingest streaming events
CREATE OR REFRESH STREAMING LIVE TABLE registration_events_raw AS
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025'; -- safeguard to not process data before begin_year

Di sini kami menggunakan modifiedAfter opsi Auto Loader untuk memastikan kami tidak memproses semua data dari jalur penyimpanan cloud. Pemrosesan bertahap dihentikan pada batas itu.

Petunjuk / Saran

Sumber data lainnya, seperti Kafka, Kinesis, dan Azure Event Hubs memiliki opsi pembaca yang setara untuk mencapai perilaku yang sama.

Mengisi ulang data dari 3 tahun sebelumnya

Sekarang Anda ingin menambahkan satu atau beberapa alur untuk mengisi ulang data sebelumnya. Dalam contoh ini, lakukan langkah-langkah berikut:

  • Gunakan alur append once. Ini melakukan pengisian ulang satu kali tanpa terus berjalan setelah isi ulang pertama. Kode tetap berada di alur kerja Anda, dan jika alur kerja dimuat ulang sepenuhnya, pemulihan data akan dijalankan kembali.
  • Buat tiga aliran pengisian ulang, satu untuk setiap tahun (dalam hal ini, data dibagi per tahun di jalur). Untuk Python, kami membuat parameter pembuatan alur, tetapi di SQL kami mengulangi kode tiga kali, sekali untuk setiap alur.

Bila Anda mengerjakan proyek sendiri dan tidak menggunakan komputasi tanpa server, Anda mungkin ingin memperbarui pekerja maksimum untuk jalur pemrosesan. Meningkatkan jumlah pekerja maksimum memastikan Anda memiliki sumber daya untuk memproses data historis sekaligus memproses data streaming saat ini sesuai SLA yang diharapkan.

Petunjuk / Saran

Jika Anda menggunakan komputasi tanpa server dengan penskalaan otomatis yang ditingkatkan (default), maka kluster Anda secara otomatis meningkat ukurannya saat beban Anda meningkat.

Phyton

from pyspark import pipelines as dp

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
    backfill_path = f"{source_root_path}/year={year}/*/*"
    @dp.append_flow(
        target="registration_events_raw",
        once=True,
        name=f"flow_registration_events_raw_backfill_{year}",
        comment=f"Backfill {year} Raw registration events")
    def backfill():
        return (
            spark
            .read
            .format("json")
            .option("inferSchema", "true")
            .load(backfill_path)
        )

# create the streaming table
dp.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dp.append_flow(
        target="registration_events_raw",
        name="flow_registration_events_raw_incremental",
        comment="Raw registration events")
def ingest():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.maxFilesPerTrigger", 100)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
        .load(incremental_load_path)
        .where(f"year(timestamp) >= {begin_year}")
    )

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
    setup_backfill_flow(year) # call the previously defined append_flow for each year

SQL

-- create the streaming table
CREATE OR REFRESH STREAMING TABLE registration_events_raw;

-- append the original incremental, streaming flow
CREATE FLOW
  registration_events_raw_incremental
AS INSERT INTO
  registration_events_raw BY NAME
SELECT * FROM STREAM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/*/*/*",
  format => "json",
  inferColumnTypes => true,
  maxFilesPerTrigger => 100,
  schemaEvolutionMode => "addNewColumns",
  modifiedAfter => "2024-12-31T23:59:59.999+00:00"
)
WHERE year(timestamp) >= '2025';


-- one time backfill 2024
CREATE FLOW
  registration_events_raw_backfill_2024
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2024/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2023
CREATE FLOW
  registration_events_raw_backfill_2023
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2023/*/*",
  format => "json",
  inferColumnTypes => true
);

-- one time backfill 2022
CREATE FLOW
  registration_events_raw_backfill_2022
AS INSERT INTO ONCE
  registration_events_raw BY NAME
SELECT * FROM read_files(
  "/Volumes/gc/demo/apps_raw/event_registration/year=2022/*/*",
  format => "json",
  inferColumnTypes => true
);

Implementasi ini menyoroti beberapa pola penting.

Pemisahan kekhawatiran

  • Pemrosesan inkremental tidak bergantung pada operasi backfill.
  • Setiap alur memiliki pengaturan konfigurasi dan pengoptimalannya sendiri.
  • Ada perbedaan yang jelas antara operasi inkremental dan isi ulang.

Eksekusi terkontrol

  • Menggunakan opsi ONCE memastikan bahwa setiap pemenuhan kembali berjalan persis sekali.
  • Aliran isi ulang tetap berada di grafik alur, tetapi menjadi tidak aktif setelah selesai. Ini siap secara otomatis untuk digunakan pada pembaruan penuh.
  • Ada jejak audit yang jelas dari operasi isi ulang dalam definisi alur.

Pengoptimalan pemrosesan

  • Anda dapat membagi isi ulang besar menjadi beberapa backfill yang lebih kecil untuk pemrosesan yang lebih cepat, atau untuk mengontrol pemrosesan.
  • Menggunakan autoscaling yang ditingkatkan secara dinamis menskalakan ukuran kluster berdasarkan beban kluster saat ini.

Evolusi skema

  • Menggunakan schemaEvolutionMode="addNewColumns" menangani perubahan skema dengan lancar.
  • Anda memiliki inferensi skema yang konsisten di seluruh data historis dan saat ini.
  • Ada penanganan kolom baru yang aman dalam data yang lebih baru.

Sumber daya tambahan