教學課程:使用 Spark 連線至適用於 NoSQL 的 Azure Cosmos DB
適用於:NoSQL
在本教學課程中,您會使用 Azure Cosmos DB Spark 連接器,從 Azure Cosmos DB for NoSQL 帳戶讀取或寫入資料。 本教學課程使用 Azure Databricks 和 Jupyter 筆記本,藉此說明如何從 Spark 整合 API for NoSQL。 本教學課程著重於 Python 和 Scala,不過您可以使用 Spark 所支援的任何語言或介面。
在本教學課程中,您會了解如何:
- 使用 Spark 和 Jupyter Notebook 連線到 NoSQL 帳戶的 API。
- 建立資料庫和容器資源。
- 將數據內嵌至容器。
- 查詢容器中的數據。
- 對容器中的項目執行一般作業。
必要條件
- 現有的 Azure Cosmos DB for NoSQL 帳戶。
- 如果您有現有的 Azure 訂用帳戶,請建立新的帳戶。
- 沒有 Azure 訂用帳戶? 您可以免費試用 Azure Cosmos DB,不需要信用卡。
- 現有 Azure Databricks 工作區。
使用 Spark 和 Jupyter 進行連線
使用現有的 Azure Databricks 工作區建立計算叢集,準備使用 Apache Spark 3.4.x 連線至 Azure Cosmos DB for NoSQL 帳戶。
開啟您的 Azure Databricks 工作區。
在工作區介面中,建立新的叢集。 設定叢集時,請至少符合下列設定:
版本 值 執行階段版本 13.3 LTS (Scala 2.12, Spark 3.4.1) 使用工作區介面,從 Maven Central 搜尋具有群組標識碼的
com.azure.cosmos.spark
Maven 套件。 特別針對Spark 3.4安裝套件,其前置詞為azure-cosmos-spark_3-4
叢集的成品標識碼。最後,建立新的筆記本。
提示
根據預設,筆記本會附加至最近建立的叢集。
在筆記本中,設定 NoSQL 帳戶端點、資料庫名稱和容器名稱的在線事務處理 (OLTP) 組態設定。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
建立資料庫與容器
使用目錄 API 管理帳戶資源,例如資料庫和容器。 然後,您可以使用 OLTP 來管理容器資源中的數據。
使用 Spark 設定目錄 API 來管理 NoSQL 資源的 API。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
使用
CREATE DATABASE IF NOT EXISTS
建立名為cosmicworks
的新資料庫。# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
使用
CREATE TABLE IF NOT EXISTS
建立名為products
的新容器。 請確定您已將分割區索引鍵路徑設定為/category
,並啟用每秒要求單位最大輸送量的1000
自動調整輸送量。# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
使用階層式分割區索引鍵組態建立名為
employees
的另一個容器。 使用/organization
、/department
和/team
作為分割區索引鍵路徑的集合。 遵循該特定順序。 此外,請將輸送量設定為手動 RU 數量400
。# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
執行筆記本數據格,以驗證您的資料庫和容器是否在 NoSQL 帳戶的 API 內建立。
內嵌資料
建立範例數據集。 然後使用 OLTP 將資料內嵌至 NoSQL 容器的 API。
建立範例數據集。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
使用
spark.createDataFrame
和先前儲存的 OLTP 設定,將範例資料新增至目標容器。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
查詢資料
將 OLTP 資料載入資料框架,以對資料執行一般查詢。 您可以使用各種語法來篩選或查詢資料。
使用
spark.read
將 OLTP 資料載入資料框架物件。 使用您稍早在本教學課程中使用的相同組態。 此外,將 設定spark.cosmos.read.inferSchema.enabled
true
為 ,以允許Spark連接器藉由取樣現有專案來推斷架構。# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
使用
printSchema
呈現在數據框架中載入之數據的架構。# Render schema df.printSchema()
// Render schema df.printSchema()
轉譯資料列,其中
quantity
資料行小於20
。 使用where
和show
函式執行此查詢。# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
轉譯數據行為
true
的第一個數據列clearance
。 使用filter
函式執行此查詢。# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
轉譯五個沒有篩選或截斷的資料列。 使用
show
函式自訂轉譯的外觀和資料列數目。# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
使用此原始 NoSQL 查詢字串來查詢您的資料:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
執行一般作業
當您在 Spark 中使用適用於 NoSQL 的 API 數據時,您可以執行部分更新,或以原始 JSON 的形式處理數據。
若要執行專案的部分更新:
複製現有的
config
組態變數,並修改新複本中的屬性。 具體來說,將寫入策略設定為ItemPatch
。 然後停用大量支援。 設定數據行和對應的作業。 最後,將預設作業類型設定為Set
。# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
針對您打算作為此修補作業目標之一的項目分割區索引鍵和唯一識別碼建立變數。
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
建立一組修補程式物件來指定目標項目,並指定應修改的欄位。
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
使用一組修補程序物件建立數據框架。 使用
write
來執行修補作業。# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
執行查詢以檢閱修補作業的結果。 該項目現在應命名為
Yamba New Surfboard
,沒有任何其他變更。# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
若要使用原始 JSON 資料:
複製現有的
config
組態變數,並修改新複本中的屬性。 具體來說,將目標容器變更為employees
。 然後設定數據contacts
行/欄位以使用原始 JSON 資料。# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
建立一組員工以內嵌至容器。
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
建立資料框架,並使用
write
內嵌員工資料。# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
使用
show
從數據框架轉譯數據。 觀察輸出中的contacts
資料行是原始 JSON。# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
相關內容
- Apache Spark
- Azure Cosmos DB 目錄 API
- 組態參數參考
- Azure Cosmos DB Spark 連接器範例
- 從 Spark 2.4 移轉至 Spark 3.*
- 版本相容性:
- 版本資訊:
- 下載連結: