使用 Azure Synapse Link 中的 Apache Spark 3 與 Azure Cosmos DB 互動

在本文中,您將了解如何使用 Synapse Apache Spark 3 與 Azure Cosmos DB 互動。 Synapse Apache Spark 3 提供 Scala、Python、SparkSQL 和 C# 的完整支援,是 Azure Cosmos DB 的 Azure Synapse Link 中分析、資料工程、資料科學和資料探索環境的核心。

與 Azure Cosmos DB 互動時,系統支援下列功能:

  • Synapse Apache Spark 3 讓您接近即時的方式,以 Azure Synapse Link 啟用 Azure Cosmos DB 容器分析之中的資料,但不影響交易式工作負載的效能。 下列兩個選項可用來查詢 Spark 中的 Azure Cosmos DB 分析存放區
    • 載入至 Spark 資料框架
    • 建立 Spark 資料表
  • Synapse Apache Spark 也可讓您將資料內嵌至 Azure Cosmos DB。 請務必注意,資料一律會透過交易式存放區內嵌到 Azure Cosmos DB 容器。 啟用 Synapse 連結時,任何新的插入、更新和刪除都會自動同步處理到分析存放區。
  • Synapse Apache Spark 也支援使用 Azure Cosmos DB 作為來源和接收的 Spark 結構化串流。

下列各節將逐步解說上述功能的語法。 您也可以查看學習 Learn 模組,了解如何使用適用於 Azure Synapse Analytics 的 Apache Spark 查詢 Azure Cosmos DB。 Azure Synapse Analytics 工作區中手勢的設計,是為了讓您在開始使用時能有簡單的立即可用體驗。 在 Synapse 工作區的資料索引標籤中,以滑鼠右鍵按一下 Azure Cosmos DB 容器時,就會顯示手勢。 您可以透過手勢快速產生程式碼,並依據您的需求進行調整。 按一下以探索資料時,也非常適合使用手勢。

重要

請留意分析結構描述中部分的限制,這些限制可能導致資料載入作業發生未預期的行為。 例如,在分析結構描述中,只有交易結構描述的前 1000 個屬性可以使用,而包含空格的屬性無法使用等。如果您遇到一些未預期的結果,請參考分析存放區結構描述限制,了解更多詳細資訊。

查詢 Azure Cosmos DB 分析存放區

在了解可用來查詢 Azure Cosmos DB 分析存放區、載入 Spark 資料框架和建立 Spark 資料表的兩個可能選項之前,請先探索經驗的差異,以便選擇適合您需求的選項。

體驗方面的差異在於,Azure Cosmos DB 容器中的基礎資料變更是否應該自動反映在 Spark 中執行的分析。 當 Spark 資料框架已註冊,或針對容器的分析存放區建立 Spark 資料表時,系統會將分析存放區中目前資料快照集的中繼資料提取至 Spark,以有效率地推送後續分析。 請務必注意,由於 Spark 會遵循延遲評估原則,除非在 Spark 資料框架上叫用動作,或針對 Spark 資料表執行 SparkSQL 查詢,否則系統不會從基礎容器的分析存放區提取實際的資料。

載入至 Spark 資料框架的情況下,系統會透過 Spark 工作階段的存留期快取提取的中繼資料,因此在建立資料框架時,系統會針對分析存放區的快照集評估在資料框架上叫用的後續動作。

另一方面,在建立 Spark 資料表的情況下,系統不會在 Spark 中快取分析存放區狀態的中繼資料,而且會在每次對 Spark 資料表執行 SparkSQL 查詢時重新載入。

因此,您可以選擇要針對分析存放區的固定快照集,或分別針對分析存放區的最新快照集評估您的 Spark 分析來載入 Spark 資料框架並建立 Spark 資料表。

注意

若要查詢 Azure Cosmos DB for MongoDB 帳戶,請深入了解分析存放區中的完整精確度結構描述標記法,以及要使用的擴充屬性名稱。

注意

請注意,options下列命令需區分大小寫。

載入至 Spark 資料框架

在此範例中,您將建立指向 Azure Cosmos DB 分析存放區的 Spark 資料框架。 接著,您可以對資料框架叫用 Spark 動作,以執行額外的分析。 此作業不會影響交易存放區。

Python 中的語法如下:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Scala 中對等的語法如下:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

建立 Spark 資料表

在此範例中,您將建立指向 Azure Cosmos DB 分析存放區的 Spark 資料表。 接著,您可以對資料表叫用 SparkSQL 查詢,以執行額外的分析。 此作業並不會影響交易式存放區,也不會移動任何資料。 如果您決定刪除此 Spakr 資料表,基礎 Azure Cosmos DB 容器和對應的分析存放區將不受影響。

此案例便於透過第三方工具重複使用 Spark 資料表,並提供對執行階段基礎資料的存取性。

建立 Spark 資料表的語法如下:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

注意

如果您有基礎 Azure Cosmos DB 容器的結構描述在一段時間內變更的情節,並希望結構描述更新後可自動反映 Spark 資料表的查詢,請將 Spark 資料表選項中的spark.cosmos.autoSchemaMerge選項設為true

將 Spark 資料框架寫入 Azure Cosmos DB 容器

在此範例中,您會將 Spark 資料框架寫入 Azure Cosmos DB 容器。 這項作業會影響交易式工作負載的效能,並取用在 Azure Cosmos DB 容器或共用資料庫上佈建的要求單位。

Python 中的語法如下:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Scala 中對等的語法如下:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

從容器載入串流 DataFrame

在此手勢中,您將使用 Spark 串流功能將容器中的資料載入至 DataFrame。 資料將會儲存在已連線至工作區的主要 Data Lake 帳戶 (和檔案系統) 中。

注意

如果要參考 Synapse Apache Spark 中的外部程式庫,請在此深入了解。 比方說,如果要將 Spark 資料框架內嵌至 Azure Cosmos DB for MongoDB 容器,可以在利用這裡的適用於 Spark 的 Mongo DB 連接器。

從 Azure Cosmos DB 容器載入串流 DataFrame

在此範例中,您將使用 Spark 的結構化串流功能,從 Azure Cosmos DB 容器將資料載入至使用 Azure Cosmos DB 中變更摘要功能的 Spark 串流資料框架。 Spark 使用的檢查點資料將會儲存在已連線至工作區的主要 Data Lake 帳戶 (和檔案系統) 中。

Python 中的語法如下:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Scala 中對等的語法如下:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

將串流資料框架寫入 Azure Cosmos DB 容器

在此範例中,您會將串流資料框架寫入 Azure Cosmos DB 容器。 這項作業會影響交易式工作負載的效能,並取用在 Azure Cosmos DB 容器或共用資料庫上佈建的要求單位。 如果未建立 /localWriteCheckpointFolder 資料夾 (在下列範例中),系統會自動建立。

Python 中的語法如下:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Scala 中對等的語法如下:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

後續步驟