チュートリアル: Spark を使用して Azure Cosmos DB for NoSQL に接続する

適用対象: NoSQL

このチュートリアルでは、Azure Cosmos DB Spark コネクタを使って、Azure Cosmos DB for NoSQL アカウントのデータの読み取りや書き込みを行います。 このチュートリアルでは、Azure Databricks と Jupyter ノートブックを使って、Spark から API for NoSQL と統合する方法を見ていただきます。 Spark でサポートされている任意の言語またはインターフェイスを使用できますが、このチュートリアルでは Python と Scala に焦点を当てます。

このチュートリアルでは、次の作業を行う方法について説明します。

  • Spark と Jupyter ノートブックを使って API for NoSQL アカウントに接続する
  • データベースとコンテナー リソースを作成する
  • データをコンテナーに取り込む
  • コンテナー内のデータのクエリを実行する
  • コンテナー内の項目に対して一般的な操作を実行する

前提条件

  • 既存の Azure Cosmos DB for NoSQL アカウント。
  • 既存の Azure Databricks ワークスペース。

Spark と Jupyter を使用して接続する

Apache Spark 3.4.x を使って Azure Cosmos DB for NoSQL アカウントに接続できる状態のコンピューティング クラスターを、既存の Azure Databricks ワークスペースを使って作成します。

  1. Azure Databricks ワークスペースを開きます。

  2. ワークスペースのインターフェイスで、新しいクラスターを作成します。 少なくとも次の設定でクラスターを構成します。

    Value
    ランタイムのバージョン 13.3 LTS (Scala 2.12、Spark 3.4.1)
  3. ワークスペースのインターフェイスを使って、グループ IDcom.azure.cosmos.spark である Maven パッケージを Maven Central で検索します。 成果物 ID の前に azure-cosmos-spark_3-4 が付いている Spark 3.4 固有のパッケージを、クラスターにインストールします。

  4. 最後に、新しいノートブックを作成します。

    ヒント

    既定では、ノートブックは最近作成されたクラスターにアタッチされます。

  5. ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名に関する OLTP 構成設定を設定します。

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

データベースとコンテナーを作成する

Catalog API を使って、データベースやコンテナーなどのアカウント リソースを管理します。 その後、OLTP を使ってコンテナー リソース内のデータを管理できます。

  1. Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. CREATE DATABASE IF NOT EXISTS を使用して、cosmicworks という新しいデータベースを作成します。

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. CREATE TABLE IF NOT EXISTS を使って、products という名前の新しいコンテナーを作成します。 パーティション キーのパスを /category に設定し、最大スループットを 1000 RU/秒 (1 秒あたりの要求ユニット数) にして自動スケーリング スループットを有効にします。

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. 階層型パーティション キー構成を使って、employees という名前の別のコンテナーを作成します。パーティション キー パスのセットとして /organization/department/team をこの順序で指定します。 また、手動のスループットを 400 RU/秒に設定します

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. ノートブックのセルを実行して、API for NoSQL アカウント内にデータベースとコンテナーが作成されることを検証します。

データの取り込み

サンプル データセットを作成した後、OLTP を使ってそのデータを API for NoSQL コンテナーに取り込みます。

  1. サンプル データ セットを作成します。

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. spark.createDataFrame と前に保存した OLTP 構成を使って、サンプル データをターゲット コンテナーに追加します。

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

クエリ データ

OLTP データをデータ フレームに読み込み、データに対して一般的なクエリを実行します。 さまざまな構文を使って、データのフィルターまたはクエリを実行できます。

  1. spark.read を使って、OLTP データをデータフレーム オブジェクトに読み込みます。 このチュートリアルで前に使ったのと同じ構成を使います。 また、spark.cosmos.read.inferSchema.enabled を true に設定して、Spark コネクタが既存の項目をサンプリングしてスキーマを推論できるようにします。

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. printSchema を使って、データフレームに読み込まれたデータのスキーマをレンダリングします。

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. quantity 列が 20 未満のデータ行をレンダリングします。 このクエリを実行するには、whereshow 関数を使います。

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. clearance 列が true である最初のデータ行をレンダリングします。 このクエリを実行するには、filter 関数を使います。

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. フィルターまたは切り詰めなしで、5 行のデータをレンダリングします。 show 関数を使って、外観とレンダリングされる行数をカスタマイズします。

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. 次の生の NoSQL クエリ文字列を使ってデータのクエリを実行します: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

一般的な操作を実行する

Spark で API for NoSQL データを使用するときは、生の JSON として部分的な更新を実行したりデータを操作したりできます。

  1. 項目の部分的な更新を実行するには、次の手順のようにします。

    1. 既存の config 構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、書き込み戦略を ItemPatch に構成し、一括サポートを無効にし、列とマップされた操作を設定し、最後に既定の操作の種類を Set に設定します。

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. このパッチ操作の一部としてターゲットにする項目のパーティション キーと一意識別子のための変数を作成します。

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. ターゲット項目を指定し、変更する必要があるフィールドを指定するための、パッチ オブジェクトのセットを作成します。

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. パッチ オブジェクトのセットを使ってデータ フレームを作成し、write を使ってパッチ操作を実行します。

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. クエリを実行して、パッチ操作の結果を確認します。 これで、項目は Yamba New Surfboard という名前になり、それ以外は変更されていないはずです。

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. 生の JSON データを操作するには、次の手順のようにします。

    1. 既存の config 構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ターゲット コンテナーを employees に変更し、生の JSON データを使うように contacts 列とフィールドを構成します。

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. コンテナーに取り込むための一連の従業員を作成します。

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. データ フレームを作成し、write を使って従業員データを取り込みます。

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. show を使ってデータ フレームからデータをレンダリングします。 contacts 列が出力で生の JSON であることを確認します。

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

次のステップ