Bagikan melalui


Memuat data dalam alur

Anda dapat memuat data dari sumber data apa pun yang didukung oleh Apache Spark di Azure Databricks menggunakan alur. Anda dapat menentukan himpunan data (tabel dan tampilan) dalam Alur Deklaratif Lakeflow Spark untuk setiap kueri yang mengembalikan Spark DataFrame, termasuk streaming DataFrame dan Pandas untuk Spark DataFrames. Untuk tugas penyerapan data, Databricks merekomendasikan penggunaan tabel streaming untuk sebagian besar kasus penggunaan. Tabel streaming baik untuk menyerap data dari penyimpanan objek cloud menggunakan Auto Loader atau dari bus pesan seperti Kafka.

Nota

  • Tidak semua sumber data memiliki dukungan SQL untuk penyerapan. Anda dapat mencampur sumber SQL dan Python dalam alur untuk menggunakan Python di mana diperlukan, dan SQL untuk operasi lain dalam alur yang sama.
  • Untuk informasi lebih lanjut tentang penggunaan pustaka yang tidak termasuk dalam Alur Deklaratif Lakeflow Spark secara default, lihat Mengelola dependensi Python untuk alur.
  • Untuk informasi umum tentang penyerapan di Azure Databricks, lihat Konektor standar di Lakeflow Connect.

Contoh di bawah ini menunjukkan beberapa pola umum.

Memuat dari tabel yang sudah ada

Muat data dari tabel yang sudah ada di Azure Databricks. Anda dapat mengubah data menggunakan kueri, atau memuat tabel untuk pemrosesan lebih lanjut di alur Anda.

Contoh berikut membaca data dari tabel yang sudah ada:

Phyton

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Memuat file dari penyimpanan objek cloud

Databricks merekomendasikan penggunaan Auto Loader dalam pipeline untuk sebagian besar tugas pengambilan data dari penyimpanan objek cloud atau dari file dalam volume Katalog Unity. Auto Loader dan pipeline dirancang untuk memuat data yang terus bertambah secara inkremental dan idempoten saat tiba di penyimpanan cloud.

Lihat Apa itu Auto Loader? dan Memuat data dari penyimpanan objek.

Contoh berikut membaca data dari penyimpanan cloud menggunakan Auto Loader:

Phyton

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Contoh berikut menggunakan Auto Loader untuk membuat himpunan data dari file CSV dalam volume Katalog Unity:

Phyton

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Nota

  • Jika Anda menggunakan Auto Loader dengan pemberitahuan file dan menjalankan refresh penuh untuk alur atau tabel streaming, Anda harus membersihkan sumber daya Anda secara manual. Anda dapat menggunakan CloudFilesResourceManager di notebook untuk melakukan pembersihan.
  • Untuk memuat file dengan Auto Loader dalam alur Katalog Unity yang diaktifkan, Anda harus menggunakan lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan alur, lihat Menggunakan Katalog Unity dengan alur.

Memuat data dari sistem bus pesan

Anda dapat mengonfigurasi alur untuk menyerap data dari bus pesan. Databricks merekomendasikan penggunaan tabel streaming dengan eksekusi berkelanjutan dan penskalaan otomatis yang lebih canggih untuk memberikan penyerapan paling efisien untuk beban latensi rendah dari bus pesan. Lihat Mengoptimalkan pemanfaatan kluster Alur Deklaratif Lakeflow Spark dengan Autoscaling.

Misalnya, kode berikut mengonfigurasi tabel streaming untuk menyerap data dari Kafka, menggunakan fungsi read_kafka :

Phyton

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Untuk mengambil data dari sumber bus pesan lainnya, lihat:

Memuat data dari Azure Event Hubs

Azure Event Hubs adalah layanan streaming data yang menyediakan antarmuka yang kompatibel dengan Apache Kafka. Anda dapat menggunakan konektor Kafka Streaming Terstruktur, yang disertakan dalam runtime Alur Deklaratif Lakeflow Spark, untuk memuat pesan dari Azure Event Hubs. Untuk mempelajari selengkapnya tentang memuat dan memproses pesan dari Azure Event Hubs, lihat Menggunakan Azure Event Hubs sebagai sumber data alur.

Memuat data dari sistem eksternal

Lakeflow Spark Declarative Pipelines mendukung pemuatan data dari sumber data apa pun yang didukung oleh Azure Databricks. Lihat Menyambungkan ke sumber data dan layanan eksternal. Anda juga dapat memuat data eksternal menggunakan Federasi Lakehouse untuk sumber data yang didukung. Karena Federasi Lakehouse membutuhkan Databricks Runtime 13.3 LTS atau yang lebih tinggi, untuk menggunakan Federasi Lakehouse, alur Anda harus dikonfigurasi agar menggunakan saluran pratinjau.

Beberapa sumber data tidak memiliki dukungan yang setara di SQL. Jika Anda tidak dapat menggunakan Federasi Lakehouse dengan salah satu sumber data ini, Anda dapat menggunakan Python untuk menyerap data dari sumbernya. Anda dapat menambahkan file sumber Python dan SQL ke alur yang sama. Contoh berikut mendeklarasikan tampilan materialisasi untuk mengakses status data saat ini dalam tabel PostgreSQL jarak jauh:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Memuat himpunan data kecil atau statis dari penyimpanan objek cloud

Anda dapat memuat himpunan data kecil atau statis menggunakan sintaks beban Apache Spark. Lakeflow Spark Declarative Pipelines mendukung semua format file yang didukung oleh Apache Spark di Azure Databricks. Untuk daftar lengkapnya, lihat Opsi format data .

Contoh berikut menunjukkan pemuatan JSON untuk membuat tabel:

Phyton

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Nota

Fungsi SQL read_files umum untuk semua lingkungan SQL di Azure Databricks. Ini adalah pola yang direkomendasikan untuk akses file langsung menggunakan SQL dalam alur. Untuk informasi selengkapnya, lihat opsi .

Memuat data dari sumber data kustom Python

Sumber data kustom Python memungkinkan Anda memuat data dalam format kustom. Anda dapat menulis kode untuk membaca dari dan menulis ke sumber data eksternal tertentu, atau memanfaatkan kode Python yang ada di sistem yang ada untuk membaca data dari sistem internal Anda sendiri. Untuk detail selengkapnya tentang mengembangkan sumber data Python, lihat sumber data kustom PySpark.

Untuk menggunakan sumber data kustom Python untuk memuat data ke dalam alur, daftarkan dengan nama format, seperti my_custom_datasource, lalu baca darinya:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Mengonfigurasi tabel streaming untuk mengabaikan perubahan dalam tabel streaming sumber

Nota

  • Bendera skipChangeCommits hanya berfungsi ketika digunakan bersama spark.readStream melalui fungsi option(). Anda tidak dapat menggunakan bendera ini dalam fungsi dp.read_stream() .
  • Anda tidak dapat menggunakan skipChangeCommits bendera saat tabel streaming sumber didefinisikan sebagai target fungsi create_auto_cdc_flow().

Secara default, tabel streaming memerlukan sumber khusus tambahan. Saat tabel streaming menggunakan tabel streaming lain sebagai sumber, dan tabel streaming sumber memerlukan pembaruan atau penghapusan, misalnya, pemrosesan GDPR "hak untuk dilupakan", parameter skipChangeCommits dapat digunakan ketika membaca tabel streaming sumber untuk tidak memperhatikan perubahan tersebut. Untuk informasi selengkapnya tentang bendera ini, lihat Mengabaikan pembaruan dan menghapus.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Akses kredensial penyimpanan dengan aman menggunakan rahasia dalam alur kerja

Anda dapat menggunakan Azure Databricks rahasia untuk menyimpan kredensial seperti kunci akses atau kata sandi. Untuk mengonfigurasi kata sandi dalam alur Anda, gunakan properti Spark dalam konfigurasi kluster pengaturan alur. Lihat Mengonfigurasi komputasi klasik untuk alur.

Contoh berikut menggunakan rahasia untuk menyimpan kunci akses yang diperlukan untuk membaca data input dari akun penyimpanan Azure Data Lake Storage (ADLS) menggunakan Auto Loader. Anda dapat menggunakan metode yang sama ini untuk mengonfigurasi rahasia apa pun yang diperlukan oleh alur Anda, misalnya, kunci AWS untuk mengakses S3, atau kata sandi ke metastore Apache Hive.

Untuk informasi lebih lanjut tentang bekerja dengan Azure Data Lake Storage, lihat Menghubungkan ke Azure Data Lake Storage dan Blob Storage.

Nota

Anda harus menambahkan awalan spark.hadoop. ke kunci konfigurasi spark_conf yang mengatur nilai rahasia.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Mengganti

  • Gunakan <storage-account-name> dengan nama akun penyimpanan ADLS.
  • <scope-name> sebagai nama cakupan rahasia untuk Azure Databricks.
  • <secret-name> dengan nama kunci yang berisi kunci akses akun penyimpanan Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Mengganti

  • <container-name> dengan nama kontainer akun penyimpanan Azure yang menyimpan data input.
  • Gunakan <storage-account-name> dengan nama akun penyimpanan ADLS.
  • <path-to-input-dataset> dengan rute ke himpunan data input.