共用方式為


教學課程:使用變更資料擷取建置 ETL 管線

瞭解如何使用 Lakeflow Spark 說明式管線 (SDP) 建立和部署 ETL (擷取、轉換和載入) 流程,並利用 Auto Loader 和變更資料擷取 (CDC) 進行資料編排。 ETL 管線會實作從來源系統讀取數據的步驟、根據需求轉換該數據,例如數據品質檢查和記錄重複數據,以及將數據寫入目標系統,例如數據倉儲或數據湖。

在本教學課程中,您將使用 MySQL customers 資料庫中資料表的數據來:

  • 使用 Debezium 或其他工具從交易式資料庫擷取變更,並將其儲存至雲端物件儲存 (S3、ADLS 或 GCS)。 在本教學課程中,您會略過設定外部 CDC 系統,而是產生虛假資料以簡化教學課程。
  • 使用自動載入器,以累加方式從雲端物件記憶體載入訊息,並將原始訊息儲存在數據表中 customers_cdc 。 自動載入器會推斷結構描述並處理結構描述演進。
  • 建立 customers_cdc_clean 表格以使用預期檢查資料品質。 例如,id 永遠不應該是 null,因為它是用於執行 upsert 作業。
  • 對已清除的 CDC 資料執行, AUTO CDC ... INTO 以將變更更新插入到最終 customers 表格中。
  • 顯示管線如何建立類型 2 緩慢變更維度 (SCD2) 表格,以追蹤所有變更。

目標是近乎即時地內嵌原始數據,併為分析師小組建置數據表,同時確保數據品質。

本教學課程使用 medallion Lakehouse 架構,其會透過銅層內嵌原始數據、使用銀層清理和驗證數據,並使用金層套用維度模型化和匯總。 如需詳細資訊,請參閱獎章湖屋架構的概念

實作的流程如下所示:

搭載 CDC 的管道

如需管線、自動載入器和 CDC 的詳細資訊,請參閱 Lakeflow Spark 宣告式管線什麼是自動載入器?什麼是變更資料擷取 (CDC)?

需求

您必須滿足下列需求,才能完成本教學課程:

  • 登入 Azure Databricks 工作區。
  • 為您的工作區啟用 Unity 目錄
  • 針對您的帳戶啟用 無伺服器計算 。 無伺服器化 Lakeflow Spark 宣告式管線並非在所有工作區域中都可用。 如需了解具有有限區域可用性的功能,可用於哪些區域,請參閱 區域列表。 如果您的帳戶未啟用無伺服器計算,則這些步驟應該可以使用您工作區的預設運算資源。
  • 有權 建立計算資源存取計算資源
  • 具有在 目錄中建立新架構的許可權。 必要的權限為 ALL PRIVILEGESUSE CATALOG 與 。 CREATE SCHEMA
  • 具有在 現有架構中建立新磁碟區的許可權。 必要的權限為 ALL PRIVILEGESUSE SCHEMA 與 。 CREATE VOLUME

變更 ETL 管線中的數據擷取

變更資料擷取 (CDC) 是擷取對交易式資料庫 (例如 MySQL 或 PostgreSQL) 或資料倉儲所做的記錄變更的程序。 CDC 會擷取資料刪除、附加和更新等作業,通常作為串流來重新具體化外部系統中的資料表。 CDC 具備增量載入的能力,同時消除大量載入更新的需求。

備註

若要簡化本教學課程,請略過設定外部 CDC 系統。 假設系統正在執行並將 CDC 資料儲存為雲端物件儲存服務(例如 S3、ADLS 或 GCS)中的 JSON 檔案。 本教學課程使用 Faker 程式庫來產生教學課程中使用的資料。

捕獲 CDC

有各種 CDC 工具可供使用。 Debezium 是領先的開源解決方案之一,但也存在其他簡化資料來源的實作,例如 Fivetran、Qlik Replicate、StreamSets、Talend、Oracle GoldenGate 和 AWS DMS。

在本教學課程中,您會使用來自外部系統的 CDC 數據,例如 Debezium 或 DMS。 Debezium 會擷取每個變更的數據列。 它通常會將資料變更的歷史記錄傳送至 Kafka 主題,或將其儲存為檔案。

您必須從 customers 資料表(JSON 格式)中擷取 CDC 資訊,檢查其正確無誤,然後在 Lakehouse 中具現化 customers 資料表。

從 Debezium 獲得的 CDC 輸入

對於每個變更,您會收到一則 JSON 訊息,其中包含要更新之列的所有欄位 (idfirstnamelastnameemail, , )。 address 該消息還包括其他元數據:

  • operation:作業程序代碼,通常是 (DELETEAPPEND、 。 UPDATE
  • operation_date:每個作業動作記錄的日期和時間。

Debezium 之類的工具可以產生更進階的輸出,例如變更前的數據列值,但本教學課程會省略它們以求簡單。

步驟 1:建立管線

建立新的 ETL 管線來查詢您的 CDC 資料來源,並在工作區中產生資料表。

  1. 在您的工作區中,按一下左上角的加號圖示。 新增。

  2. 按一下 ETL 管線

  3. 將管線的標題變更為 Pipelines with CDC tutorial 或您偏好的名稱。

  4. 在標題下,選擇您具有寫入許可權的目錄和結構描述。

    如果您未在程式碼中指定目錄或結構描述,則預設會使用此目錄和結構描述。 您的程式碼可以寫入至任何目錄或模式,透過指定完整路徑。 本教學課程會使用您在此處指定的預設值。

  5. 進階選項中,選取從 空白檔案開始

  6. 為您的程式碼選擇一個資料夾。 您可以選取 瀏覽 以瀏覽工作區中的資料夾清單。 您可以選擇您擁有寫入權限的任何資料夾。

    若要使用版本控制,請選取 Git 資料夾。 如果您需要建立新資料夾,請選取 加 圖示。 按鈕。

  7. 根據您要用於教學課程的語言,選擇 PythonSQL 作為檔案的語言。

  8. 按一下 [選取] 以建立具有這些設定的管線,然後開啟 Lakeflow 管線編輯器。

您現在有一個空白管道,其中包含預設目錄和結構描述。 接下來,設定要在教學課程中匯入的範例資料。

步驟 2:建立要在本教學課程中匯入的範例資料

如果您要從現有來源匯入自己的資料,則不需要此步驟。 在本教學課程中,請產生虛假資料作為教學課程的範例。 建立筆記本以執行 Python 資料產生指令碼。 此程式碼只需要執行一次即可產生範例資料,因此請在管線的 explorations 資料夾中建立它,這不會作為管線更新的一部分執行。

備註

此程式碼使用 Faker 產生範例 CDC 資料。 Faker 可以自動安裝,因此本教學使用 %pip install faker。 您也可以為筆記本設定對 *faker* 的依賴。 請參閱 將相依性新增至筆記本

  1. 在 Lakeflow 管線編輯器中,在編輯器左側的資產瀏覽器側邊欄中,按一下 加號 圖示。新增,然後選擇探索。

  2. 為其 指定名稱,例如 Setup data,選取 Python。 您可以保留預設目標資料夾,也就是新 explorations 資料夾。

  3. 點擊 建立。 這會在新資料夾中建立筆記本。

  4. 在第一個儲存格中輸入下列程式碼。 您必須變更 <my_catalog><my_schema> 的定義以符合您在上一個程序中選取的預設型錄和模式:

    %pip install faker
    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = dbName = db = "<my_schema>"
    
    spark.sql(f'USE CATALOG `{catalog}`')
    spark.sql(f'USE SCHEMA `{schema}`')
    spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
    volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"
    
    try:
      dbutils.fs.ls(volume_folder+"/customers")
    except:
      print(f"folder doesn't exist, generating the data under {volume_folder}...")
      from pyspark.sql import functions as F
      from faker import Faker
      from collections import OrderedDict
      import uuid
      fake = Faker()
      import random
    
      fake_firstname = F.udf(fake.first_name)
      fake_lastname = F.udf(fake.last_name)
      fake_email = F.udf(fake.ascii_company_email)
      fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
      fake_address = F.udf(fake.address)
      operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
      fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
      fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
    
      df = spark.range(0, 100000).repartition(100)
      df = df.withColumn("id", fake_id())
      df = df.withColumn("firstname", fake_firstname())
      df = df.withColumn("lastname", fake_lastname())
      df = df.withColumn("email", fake_email())
      df = df.withColumn("address", fake_address())
      df = df.withColumn("operation", fake_operation())
      df_customers = df.withColumn("operation_date", fake_date())
      df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
    
  5. 若要產生教學課程中使用的資料集,請輸入 Shift + Enter 以執行程式碼:

  6. 選擇性。 若要預覽本教學課程中使用的資料,請在下一個儲存格中輸入下列程式碼,然後執行程式碼。 更新目錄和結構描述,以符合先前程式碼中的路徑。

    # Update these to match the catalog and schema
    # that you used for the pipeline in step 1.
    catalog = "<my_catalog>"
    schema = "<my_schema>"
    
    display(spark.read.json(f"/Volumes/{catalog}/{schema}/raw_data/customers"))
    

這會產生一個大型資料集(包含虛假的 CDC 資料),您可以在教學課程的其餘部分使用。 在下一個步驟中,使用自動載入器擷取資料。

步驟 3:使用 Auto Loader 以增量方式擷取資料

下一步是將原始資料從(偽造的)雲端儲存擷取到青銅層中。

這可能會因為多種原因而面臨挑戰,因為您必須:

  • 大規模運作,可能會處理數百萬個小檔案。
  • 推斷架構和 JSON 類型。
  • 處理使用不正確 JSON 架構的錯誤記錄。
  • 處理架構演進(例如,客戶表中的新欄位)。

自動載入器可簡化此擷取,包括結構描述推斷和結構描述演變,同時擴展至數百萬個傳入檔案。 自動載入器可在 Python 中使用 cloudFiles 和 在 SQL 中使用 SELECT * FROM STREAM read_files(...) ,而且可以搭配各種格式使用(JSON、CSV、Apache Avro 等):

將資料表定義為串流資料表,可保證您只取用新的傳入資料。 如果您未將它定義為串流表格,它會掃描並擷取所有可用的資料。 如需詳細資訊 ,請參閱串流數據表

  1. 若要使用自動載入器擷取傳入的 CDC 資料,請將下列程式碼複製並貼到使用管線建立的程式碼檔中 (稱為 my_transformation.py)。 您可以根據您在建立管線時選擇的語言,使用 Python 或 SQL。 請務必將 <catalog><schema> 替換為您為此管道設定的預設值。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # Replace with the catalog and schema name that
    # you are using:
    path = "/Volumes/<catalog>/<schema>/raw_data/customers"
    
    
    # Create the target bronze table
    dp.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
    
    # Create an Append Flow to ingest the raw data into the bronze table
    @dp.append_flow(
      target = "customers_cdc_bronze",
      name = "customers_bronze_ingest_flow"
    )
    def customers_bronze_ingest_flow():
      return (
          spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "json")
              .option("cloudFiles.inferColumnTypes", "true")
              .load(f"{path}")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_bronze_ingest_flow AS
    INSERT INTO customers_cdc_bronze BY NAME
      SELECT *
      FROM STREAM read_files(
        -- replace with the catalog/schema you are using:
        "/Volumes/<catalog>/<schema>/raw_data/customers",
        format => "json",
        inferColumnTypes => "true"
      )
    
  2. 按一下 播放圖示。執行檔案或執行管線,以啟動已連線管線的更新。 如果您的管線中只有一個來源檔案,這些檔案在功能上是等效的。

更新完成時,編輯器將會顯示更新後的管線資訊。

  • 程式碼右側側邊欄中的管線圖形 (DAG) 會顯示單一資料表 customers_cdc_bronze
  • 更新的摘要會顯示在管線資產瀏覽器的頂端。
  • 所產生表格的詳細資料會顯示在底部窗格中,您可以選取表格來瀏覽資料表中的資料。

這是從雲端儲存匯入的原始青銅層資料。 在下一個步驟中,清理資料以建立銀層資料表。

步驟 4:整理和期望來追蹤資料品質

定義青銅層之後,透過新增期望來建立銀層,以控制資料品質。 檢查以下條件:

  • 識別子永遠不得為 null
  • CDC 作業類型必須有效。
  • 自動載入器必須正確讀取 JSON。

不符合這些條件的資料列會被移除。

如需更多資訊,請參閱使用管道期望管理數據品質

  1. 從管道資產瀏覽器側邊欄中,按一下 加號圖示。新增,然後 轉換

  2. 輸入 名稱 ,然後選擇原始程式碼檔案的語言 (Python 或 SQL)。 您可以在管線中混合使用語言,因此可以為此步驟選擇任意一種語言。

  3. 若要建立具有已清理資料表的銀色圖層並施加限制,請將下列程式碼複製並貼到新檔案中 (根據檔案的語言選擇 Python 或 SQL)。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(
      name = "customers_cdc_clean",
      expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
      )
    
    @dp.append_flow(
      target = "customers_cdc_clean",
      name = "customers_cdc_clean_flow"
    )
    def customers_cdc_clean_flow():
      return (
          spark.readStream.table("customers_cdc_bronze")
              .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
      )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
      CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
      CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
    )
    COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
    
    CREATE FLOW customers_cdc_clean_flow AS
    INSERT INTO customers_cdc_clean BY NAME
    SELECT * FROM STREAM customers_cdc_bronze;
    
  4. 按一下 播放圖示。執行檔案或執行管線,以啟動已連線管線的更新。

    因為現在有兩個原始檔,所以它們不會做同樣的事情,但在這種情況下,輸出是相同的。

    • 執行管線 會運行您的整個管線,包括步驟 3 中的程式碼。 如果您的輸入資料正在更新,這會將來源的任何變更匯入到您的青銅層。 這不會從資料設定步驟執行程式碼,因為該步驟位於探索資料夾中,而不是管線來源的一部分。
    • 執行檔案只會 執行目前的來源檔案。 在這種情況下,在不更新輸入資料的情況下,這會從快取的青銅表產生白銀資料。 在建立或編輯管線程式碼時,只執行此檔案以加快反覆專案速度會很有用。

更新完成後,您可以看到管線圖表現在顯示兩個表格(銀色圖層取決於青銅層),底部面板顯示兩個表格的詳細資訊。 現在在管線資產瀏覽器的頂端會顯示多次執行的時間,但只會提供最近一次執行的詳細資料。

接下來,創建表格的 customers 最終黃金層版本。

步驟 5:使用 AUTO CDC 流程具體化客戶資料表

到目前為止,表格只是在每個步驟中傳遞了 CDC 資料。 現在,建立 customers 資料表以包含最新的視圖,並且作為原始資料表的複本,而不是包含建立該資料表的 CDC 作業清單。

這不容易手動實作。 您必須考慮像資料去重這樣的因素,以保留最新的數據列。

不過,Lakeflow Spark 宣告式管線透過 AUTO CDC 作業來解決這些挑戰。

  1. 從管道資產瀏覽器的側邊欄中,按一下 加號圖示,然後選擇 添加轉換

  2. 輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。 您可以再次為此步驟選擇任一語言,但請使用下面的正確程式碼。

  3. 若要在 Lakeflow Spark 宣告式管道中使用 AUTO CDC 處理 CDC 資料,請將以下程式碼複製並貼上至新檔案中。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    dp.create_streaming_table(name="customers", comment="Clean, materialized customers")
    
    dp.create_auto_cdc_flow(
      target="customers",  # The customer table being materialized
      source="customers_cdc_clean",  # the incoming CDC
      keys=["id"],  # what we'll be using to match the rows to upsert
      sequence_by=col("operation_date"),  # de-duplicate by operation date, getting the most recent value
      ignore_null_updates=False,
      apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE condition
      except_column_list=["operation", "operation_date", "_rescued_data"],
    )
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers;
    
    CREATE FLOW customers_cdc_flow
    AS AUTO CDC INTO customers
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 1;
    
  4. 按一下 播放圖示。執行檔案以 啟動已連線管線的更新。

更新完成後,您可以看到管線圖表顯示 3 個表格,從銅牌到銀牌再到金牌。

步驟 6:使用緩慢變更維度類型 2 (SCD2) 追蹤更新歷程記錄

建立表格通常需要追蹤由APPENDUPDATEDELETE所產生的所有變更:

  • 歷程記錄:您想要保留資料表所有變更的歷程記錄。
  • 可追溯性:您想要查看發生的操作。

帶有 Lakeflow SDP 的 SCD2

Delta 支援變更資料流程 (CDF),並 table_change 可在 SQL 和 Python 中查詢資料表修改。 不過,CDF 的主要使用案例是擷取管道中的變更,而不是從一開始就建立資料表變更的完整檢視。

如果您有順序錯亂的事件,實作會變得特別複雜。 如果您必須依時間戳記來排序變更,並接收過去發生的修改,則必須在 SCD 表格中附加新項目,並更新先前的項目。

Lakeflow SDP 消除了這種複雜性,並允許您建立一個單獨的資料表,其中包含從一開始的所有修改。 然後,可以大規模使用此表格,並視需要使用特定分割區或 ZORDER 直欄。 亂序欄位會根據_sequence_by自動處理。

若要建立 SCD2 表格,請使用 SQL 中的選項 STORED AS SCD TYPE 2 或 Python 中的選項 stored_as_scd_type="2"

備註

您也可以使用選項來限制功能追蹤的欄位:TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}

  1. 從管道資產瀏覽器的側邊欄中,按一下 加號圖示,然後選擇 添加轉換

  2. 輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。

  3. 將以下程式碼複製並貼到新檔案中。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    # create the table
    dp.create_streaming_table(
        name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
    )
    
    # store all changes as SCD2
    dp.create_auto_cdc_flow(
        target="customers_history",
        source="customers_cdc_clean",
        keys=["id"],
        sequence_by=col("operation_date"),
        ignore_null_updates=False,
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "operation_date", "_rescued_data"],
        stored_as_scd_type="2",
    )  # Enable SCD2 and store individual updates
    

    SQL

    CREATE OR REFRESH STREAMING TABLE customers_history;
    
    CREATE FLOW customers_history_cdc
    AS AUTO CDC INTO
      customers_history
    FROM stream(customers_cdc_clean)
    KEYS (id)
    APPLY AS DELETE WHEN
    operation = "DELETE"
    SEQUENCE BY operation_date
    COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
    STORED AS SCD TYPE 2;
    
  4. 按一下 播放圖示。執行檔案以 啟動已連線管線的更新。

更新完成後,管線圖表將包含新 customers_history 表格,並且依賴於銀層表格,而底部面板將顯示所有四個表格的詳細資訊。

步驟 7:建立具體化檢視,以追蹤誰變更其資訊最多

數據表 customers_history 包含使用者對其資訊所做的所有歷程記錄變更。 在黃金層中建立簡單的具體化檢視,以追蹤最常更改其資訊的是誰。 這可用於真實世界中的詐騙偵測分析或用戶建議。 此外,應用 SCD2 變更後已經刪除重複項,因此您可以直接計算每個使用者 ID 的行數。

  1. 從管道資產瀏覽器的側邊欄中,按一下 加號圖示,然後選擇 添加轉換

  2. 輸入 名稱 ,然後為新的原始程式碼檔案選擇語言 (Python 或 SQL)。

  3. 將下列程式碼複製並貼到新的原始檔中。

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    
    @dp.table(
      name = "customers_history_agg",
      comment = "Aggregated customer history"
    )
    def customers_history_agg():
      return (
        spark.read.table("customers_history")
          .groupBy("id")
          .agg(
              count("address").alias("address_count"),
              count("email").alias("email_count"),
              count("firstname").alias("firstname_count"),
              count("lastname").alias("lastname_count")
          )
      )
    

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
    SELECT
      id,
      count("address") as address_count,
      count("email") AS email_count,
      count("firstname") AS firstname_count,
      count("lastname") AS lastname_count
    FROM customers_history
    GROUP BY id
    
  4. 按一下 播放圖示。執行檔案以 啟動已連線管線的更新。

更新完成後,管道圖中會出現一個依賴於 customers_history 該表的新表,您可以在底部面板中查看它。 您的管線現已完成。 您可以執行完整的 執行流程來測試它。 剩下的唯一任務是排程管線以定期更新。

步驟 8:建立執行 ETL 流程的作業

接下來,建立工作流程,以使用 Databricks 作業將管線中的資料擷取、處理和分析步驟自動化。

  1. 在編輯器頂端,選擇 Schedule (排程 ) 按鈕。
  2. 如果出現 Schedules (排程) 對話方塊,請選擇 Add schedule (新增排程)。
  3. 這會開啟 [新增排程 ] 對話方塊,您可以在其中建立作業,以依排程執行管線。
  4. 可以選擇為任務命名。
  5. 依預設,排程設定為每天執行一次。 您可以接受此預設值,或設定自己的排程。 選擇 進 可讓您選擇設定工作執行的特定時間。 選取 [ 更多選項 ] 可讓您在工作執行時建立通知。
  6. 選取 [ 建立 ] 以套用變更並建立作業。

現在,作業將每天執行,讓您的管線保持最新狀態。 您可以再次選擇 Schedule (排程 ) 以檢視排程清單。 您可以從該對話方塊管理管線的排程,包括新增、編輯或移除排程。

按一下排程 (或作業) 的名稱,會帶您前往 [作業與管線 ] 清單中的作業頁面。 您可以從該處檢視有關工作執行的詳細資訊,包括執行歷史記錄,或使用 [立即執行] 按鈕立即執行工作。

如需作業執行的詳細資訊,請參閱 Lakeflow 作業的監視和可觀察性

其他資源