Bagikan melalui


Memuat data dengan Delta Live Tables

Anda dapat memuat data dari sumber data apa pun yang didukung oleh Apache Spark di Azure Databricks dengan menggunakan Delta Live Tables. Anda dapat menentukan himpunan data (tabel dan tampilan) di Delta Live Tables terhadap kueri apa pun yang menghasilkan Spark DataFrame, termasuk streaming DataFrames dan Pandas untuk Spark DataFrames. Untuk tugas penyerapan data, Databricks merekomendasikan untuk menggunakan tabel streaming untuk sebagian besar kasus penggunaan. Tabel streaming baik untuk menyerap data dari penyimpanan objek cloud dengan menggunakan Auto Loader atau dari bus pesan seperti Kafka. Contoh di bawah ini mendemonstrasikan beberapa pola umum.

Penting

Tidak semua sumber data memiliki dukungan SQL. Anda dapat mencampur buku catatan SQL dan Python dalam alur Delta Live Tables untuk menggunakan SQL untuk semua operasi di luar penyerapan.

Untuk detail tentang bekerja dengan pustaka yang tidak dipaketkan dalam Tabel Langsung Delta secara default, lihat Mengelola dependensi Python untuk alur Tabel Langsung Delta.

Memuat file dari penyimpanan objek cloud

Databricks merekomendasikan penggunaan Auto Loader dengan Tabel Langsung Delta untuk sebagian besar tugas penyerapan data dari penyimpanan objek cloud. Auto Loader dan Delta Live Table dirancang untuk memuat data yang terus bertambah secara bertahap dan idempotensi saat tiba di penyimpanan cloud. Contoh berikut menggunakan Auto Loader untuk membuat himpunan data dari file CSV dan JSON:

Catatan

Untuk memuat file dengan Auto Loader dalam alur dengan dukungan Unity Catalog, Anda harus menggunakan lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Lihat Apa itu Sintaks Auto Loader? dan Auto Loader SQL.

Peringatan

Jika Anda menggunakan Auto Loader dengan pemberitahuan file dan menjalankan refresh penuh untuk alur atau tabel streaming, Anda harus membersihkan sumber daya Anda secara manual. Anda dapat menggunakan CloudFilesResourceManager di notebook untuk melakukan pembersihan.

Memuat data dari bus pesan

Anda dapat mengonfigurasi alur Delta Live Tables untuk menyerap data dari bus pesan dengan tabel streaming. Databricks merekomendasikan untuk menggabungkan tabel streaming dengan eksekusi berkelanjutan dan penskalaan otomatis yang ditingkatkan untuk memberikan penyerapan yang paling efisien untuk pemuatan latensi rendah dari bus pesan. Lihat Mengoptimalkan pemanfaatan kluster alur Delta Live Tables dengan Penskalaan Otomatis yang Ditingkatkan.

Misalnya, kode berikut mengonfigurasi tabel streaming untuk menyerap data dari Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Anda dapat menulis operasi hilir di SQL murni untuk melakukan transformasi streaming pada data ini, seperti dalam contoh berikut:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Untuk contoh bekerja dengan Azure Event Hubs, lihat Menggunakan Azure Event Hubs sebagai sumber data Delta Live Tables.

Lihat Mengonfigurasi sumber data streaming.

Memuat data dari sistem eksternal

Tabel Langsung Delta mendukung pemuatan data dari sumber data apa pun yang didukung oleh Azure Databricks. Lihat Menyambungkan ke sumber data. Anda juga dapat memuat data eksternal menggunakan Federasi Lakehouse untuk sumber data yang didukung. Karena Lakehouse Federation memerlukan Databricks Runtime 13.3 LTS atau lebih tinggi, untuk menggunakan Federasi Lakehouse, alur Anda harus dikonfigurasi untuk menggunakan saluran pratinjau.

Beberapa sumber data tidak memiliki dukungan yang setara di SQL. Jika Anda tidak dapat menggunakan Federasi Lakehouse dengan salah satu sumber data ini, Anda bisa menggunakan buku catatan Python mandiri untuk menyerap data dari sumbernya. Buku catatan ini kemudian dapat ditambahkan sebagai pustaka sumber dengan buku catatan SQL untuk membangun alur Tabel Langsung Delta. Contoh berikut mendeklarasikan tampilan materialisasi untuk mengakses status data saat ini dalam tabel PostgreSQL jarak jauh:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Memuat himpunan data kecil atau statis dari penyimpanan objek cloud

Anda dapat memuat himpunan data kecil atau statis menggunakan sintaks beban Apache Spark. Tabel Langsung Delta mendukung semua format file yang didukung oleh Apache Spark di Azure Databricks. Untuk daftar lengkapnya, lihat Opsi format data.

Contoh berikut menunjukkan pemuatan JSON untuk membuat tabel Tabel Langsung Delta:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Catatan

Konstruksi SELECT * FROM format.`path`; SQL umum untuk semua lingkungan SQL di Azure Databricks. Ini adalah pola yang direkomendasikan untuk akses file langsung menggunakan SQL dengan Delta Live Tables.

Mengakses kredensial penyimpanan dengan aman dengan rahasia dalam alur

Anda dapat menggunakan rahasia Azure Databricks untuk menyimpan kredensial seperti kunci akses atau kata sandi. Untuk mengonfigurasi rahasia di alur Anda, gunakan properti Spark di konfigurasi kluster pengaturan alur. Lihat Mengonfigurasi pengaturan komputasi Anda.

Contoh berikut menggunakan rahasia untuk menyimpan kunci akses yang diperlukan untuk membaca data input dari akun penyimpanan Azure Data Lake Storage Gen2 (ADLS Gen2) menggunakan Auto Loader. Anda dapat menggunakan metode yang sama ini untuk mengonfigurasi rahasia apa pun yang diperlukan oleh alur Anda, misalnya, kunci AWS untuk mengakses S3, atau kata sandi ke metastore Apache Hive.

Untuk mempelajari selengkapnya tentang bekerja dengan Azure Data Lake Storage Gen2, lihat Menyambungkan ke Azure Data Lake Storage Gen2 dan Blob Storage.

Catatan

Anda harus menambahkan awalan spark.hadoop. ke kunci konfigurasi spark_conf yang menetapkan nilai rahasia.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Menggantikan

  • <storage-account-name> dengan nama akun penyimpanan ADLS Gen2.
  • <scope-name> dengan nama cakupan rahasia Azure Databricks.
  • <secret-name> dengan nama kunci yang berisi kunci akses akun penyimpanan Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Menggantikan

  • <container-name> dengan nama kontainer akun penyimpanan Azure yang menyimpan data masukan.
  • <storage-account-name> dengan nama akun penyimpanan ADLS Gen2.
  • <path-to-input-dataset> dengan jalur ke himpunan data masukan.

Memuat data dari Azure Event Hubs

Azure Event Hubs adalah layanan streaming data yang menyediakan antarmuka yang kompatibel dengan Apache Kafka. Anda dapat menggunakan konektor Kafka Streaming Terstruktur, yang disertakan dalam runtime Delta Live Tables, untuk memuat pesan dari Azure Event Hubs. Untuk mempelajari selengkapnya tentang memuat dan memproses pesan dari Azure Event Hubs, lihat Menggunakan Azure Event Hubs sebagai sumber data Tabel Langsung Delta.