共用方式為


教學課程:使用 Spark 連線至適用於 NoSQL 的 Azure Cosmos DB

適用於:NoSQL

在本教學課程中,您會使用 Azure Cosmos DB Spark 連接器,從 Azure Cosmos DB for NoSQL 帳戶讀取或寫入資料。 本教學課程使用 Azure Databricks 和 Jupyter 筆記本,藉此說明如何從 Spark 整合 API for NoSQL。 本教學課程著重於 Python 和 Scala,不過您可以使用 Spark 所支援的任何語言或介面。

在本教學課程中,您會了解如何:

  • 使用 Spark 和 Jupyter Notebook 連線到 NoSQL 帳戶的 API。
  • 建立資料庫和容器資源。
  • 將數據內嵌至容器。
  • 查詢容器中的數據。
  • 對容器中的項目執行一般作業。

必要條件

  • 現有的 Azure Cosmos DB for NoSQL 帳戶。
  • 現有 Azure Databricks 工作區。

使用 Spark 和 Jupyter 進行連線

使用現有的 Azure Databricks 工作區建立計算叢集,準備使用 Apache Spark 3.4.x 連線至 Azure Cosmos DB for NoSQL 帳戶。

  1. 開啟您的 Azure Databricks 工作區。

  2. 在工作區介面中,建立新的叢集。 設定叢集時,請至少符合下列設定:

    版本
    執行階段版本 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. 使用工作區介面,從 Maven Central 搜尋具有群組標識碼com.azure.cosmos.sparkMaven 套件。 特別針對Spark 3.4安裝套件,其前置詞azure-cosmos-spark_3-4叢集的成品標識碼

  4. 最後,建立新的筆記本

    提示

    根據預設,筆記本會附加至最近建立的叢集。

  5. 在筆記本中,設定 NoSQL 帳戶端點、資料庫名稱和容器名稱的在線事務處理 (OLTP) 組態設定。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

建立資料庫與容器

使用目錄 API 管理帳戶資源,例如資料庫和容器。 然後,您可以使用 OLTP 來管理容器資源中的數據。

  1. 使用 Spark 設定目錄 API 來管理 NoSQL 資源的 API。

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. 使用 CREATE DATABASE IF NOT EXISTS建立名為 cosmicworks 的新資料庫。

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. 使用 CREATE TABLE IF NOT EXISTS建立名為 products 的新容器。 請確定您已將分割區索引鍵路徑設定為 /category ,並啟用每秒要求單位最大輸送量的 1000 自動調整輸送量。

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 使用階層式分割區索引鍵組態建立名為 employees 的另一個容器。 使用 /organization/department/team 作為分割區索引鍵路徑的集合。 遵循該特定順序。 此外,請將輸送量設定為手動 RU 數量 400

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. 執行筆記本數據格,以驗證您的資料庫和容器是否在 NoSQL 帳戶的 API 內建立。

內嵌資料

建立範例數據集。 然後使用 OLTP 將資料內嵌至 NoSQL 容器的 API。

  1. 建立範例數據集。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. 使用 spark.createDataFrame 和先前儲存的 OLTP 設定,將範例資料新增至目標容器。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

查詢資料

將 OLTP 資料載入資料框架,以對資料執行一般查詢。 您可以使用各種語法來篩選或查詢資料。

  1. 使用 spark.read 將 OLTP 資料載入資料框架物件。 使用您稍早在本教學課程中使用的相同組態。 此外,將 設定spark.cosmos.read.inferSchema.enabledtrue為 ,以允許Spark連接器藉由取樣現有專案來推斷架構。

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. 使用 printSchema呈現在數據框架中載入之數據的架構。

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. 轉譯資料列,其中 quantity 資料行小於 20。 使用 whereshow 函式執行此查詢。

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. 轉譯數據行為 true的第一個數據列clearance。 使用 filter 函式執行此查詢。

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. 轉譯五個沒有篩選或截斷的資料列。 使用 show 函式自訂轉譯的外觀和資料列數目。

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 使用此原始 NoSQL 查詢字串來查詢您的資料: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

執行一般作業

當您在 Spark 中使用適用於 NoSQL 的 API 數據時,您可以執行部分更新,或以原始 JSON 的形式處理數據。

  1. 若要執行專案的部分更新:

    1. 複製現有的 config 組態變數,並修改新複本中的屬性。 具體來說,將寫入策略設定為 ItemPatch。 然後停用大量支援。 設定數據行和對應的作業。 最後,將預設作業類型設定為 Set

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. 針對您打算作為此修補作業目標之一的項目分割區索引鍵和唯一識別碼建立變數。

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. 建立一組修補程式物件來指定目標項目,並指定應修改的欄位。

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. 使用一組修補程序物件建立數據框架。 使用 write 來執行修補作業。

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. 執行查詢以檢閱修補作業的結果。 該項目現在應命名為 Yamba New Surfboard,沒有任何其他變更。

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 若要使用原始 JSON 資料:

    1. 複製現有的 config 組態變數,並修改新複本中的屬性。 具體來說,將目標容器變更為 employees。 然後設定數據 contacts 行/欄位以使用原始 JSON 資料。

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. 建立一組員工以內嵌至容器。

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. 建立資料框架,並使用 write 內嵌員工資料。

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. 使用 show從數據框架轉譯數據。 觀察輸出中的 contacts 資料行是原始 JSON。

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

後續步驟