使用 Spark 查詢 Cosmos DB 資料
針對已啟用分析存放區的 Azure Cosmos DB 資料庫,新增連結服務後,您可以使用連結服務,查詢使用 Azure Synapse Analytics 工作區中 Spark 集區的資料。
將 Azure Cosmos DB 分析資料載入資料框架
若要從 Azure Cosmos DB 連結服務初始探索或快速分析資料,通常最簡單的方式是使用 PySpark (Python 的 Spark 特定實作),或 Scala (Spark 上常用的 Java 型語言) 等Spark 支援語言,將資料從容器載入資料框架。
例如,下列 PySpark 程式代碼可用來從連線到 的 my-container 容器中的數據載入名為 df 的數據框架,並使用 my_linked_service鏈接服務來顯示前 10 個數據列:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
假設 my-container 容器是用來儲存類似下列範例的專案:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
PySpark 程式碼的輸出會類似下表:
_rid | _ts | productID | 產品名稱 | 識別碼 | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | 控件 | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
資料會從容器中的分析存放區載入,而不是作業存放區,請確保作業存放區沒有查詢額外負荷。 分析數據存放區中的欄位包括應用程式定義的欄位(在此案例中為 productID 和 productName),以及自動建立的元數據欄位。
載入資料框架後,您可以使用資料框架的原生方法探索資料。 例如,下列程式代碼會建立只包含 productID 和 productName 數據行的新數據框架,並依 productName 排序:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
此程式碼的輸出會類似下表:
productID | 產品名稱 |
---|---|
125 | Thingumy |
123 | 控件 |
124 | Wotsit |
... | ... |
將資料框架寫入 Cosmos DB 容器
在大部分 HTAP 案例中,您應該使用連結服務,將資料從分析存放區讀取至 Spark。 不過 ,您可以將 數據框架的內容寫入容器,如下列範例所示:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
注意
將數據框架寫入容器會更新 作業 存放區,而且可能會影響其效能。 然後同步變更至分析存放區。
使用 Spark SQL 查詢 Azure Cosmos DB 分析資料
Spark SQL 是 Spark API,並在 Spark 集區中提供 SQL 語言語法和關聯式資料庫語意。 您可以使用 Spark SQL 針對可使用 SQL 查詢的資料表,定義中繼資料。
例如,下列程式代碼會根據先前範例中使用的假設容器,建立名為 Products 的數據表:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
提示
程式 %%sql
代碼開頭的 關鍵詞是一種 魔術 ,可指示 Spark 集區以 SQL 的形式執行程式代碼,而不是預設語言(通常設定為 PySpark)。
使用此方法,您可以在 Spark 集區中建立邏輯資料庫,然後用來查詢 Azure Cosmos DB 中的分析資料,並支援資料分析和報告工作負載,而不影響 Azure Cosmos DB 帳戶中的作業存放區。