チュートリアル: Spark を使用して Azure Cosmos DB for NoSQL に接続する
適用対象: NoSQL
このチュートリアルでは、Azure Cosmos DB Spark コネクタを使って、Azure Cosmos DB for NoSQL アカウントのデータの読み取りや書き込みを行います。 このチュートリアルでは、Azure Databricks と Jupyter ノートブックを使って、Spark から API for NoSQL と統合する方法を見ていただきます。 Spark でサポートされている任意の言語またはインターフェイスを使用できますが、このチュートリアルでは Python と Scala に焦点を当てます。
このチュートリアルでは、次の作業を行う方法について説明します。
- Spark と Jupyter ノートブックを使って API for NoSQL アカウントに接続する
- データベースとコンテナー リソースを作成する
- データをコンテナーに取り込む
- コンテナー内のデータのクエリを実行する
- コンテナー内の項目に対して一般的な操作を実行する
前提条件
- 既存の Azure Cosmos DB for NoSQL アカウント。
- Azure サブスクリプションを既にお持ちの場合は、新しいアカウントを作成します。
- Azure サブスクリプションがない場合。 Azure Cosmos DB を無料で試すことができます。クレジット カードは必要ありません。
- 既存の Azure Databricks ワークスペース。
Spark と Jupyter を使用して接続する
Apache Spark 3.4.x を使って Azure Cosmos DB for NoSQL アカウントに接続できる状態のコンピューティング クラスターを、既存の Azure Databricks ワークスペースを使って作成します。
Azure Databricks ワークスペースを開きます。
ワークスペースのインターフェイスで、新しいクラスターを作成します。 少なくとも次の設定でクラスターを構成します。
Value ランタイムのバージョン 13.3 LTS (Scala 2.12、Spark 3.4.1) ワークスペースのインターフェイスを使って、グループ ID が
com.azure.cosmos.spark
である Maven パッケージを Maven Central で検索します。 成果物 ID の前にazure-cosmos-spark_3-4
が付いている Spark 3.4 固有のパッケージを、クラスターにインストールします。最後に、新しいノートブックを作成します。
ヒント
既定では、ノートブックは最近作成されたクラスターにアタッチされます。
ノートブック内で、NoSQL アカウント エンドポイント、データベース名、コンテナー名に関する OLTP 構成設定を設定します。
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
データベースとコンテナーを作成する
Catalog API を使って、データベースやコンテナーなどのアカウント リソースを管理します。 その後、OLTP を使ってコンテナー リソース内のデータを管理できます。
Spark を使って API for NoSQL リソースを管理するように Catalog API を構成します。
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
CREATE DATABASE IF NOT EXISTS
を使用して、cosmicworks
という新しいデータベースを作成します。# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
CREATE TABLE IF NOT EXISTS
を使って、products
という名前の新しいコンテナーを作成します。 パーティション キーのパスを/category
に設定し、最大スループットを1000
RU/秒 (1 秒あたりの要求ユニット数) にして自動スケーリング スループットを有効にします。# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
階層型パーティション キー構成を使って、
employees
という名前の別のコンテナーを作成します。パーティション キー パスのセットとして/organization
、/department
、/team
をこの順序で指定します。 また、手動のスループットを400
RU/秒に設定します# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
ノートブックのセルを実行して、API for NoSQL アカウント内にデータベースとコンテナーが作成されることを検証します。
データの取り込み
サンプル データセットを作成した後、OLTP を使ってそのデータを API for NoSQL コンテナーに取り込みます。
サンプル データ セットを作成します。
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
spark.createDataFrame
と前に保存した OLTP 構成を使って、サンプル データをターゲット コンテナーに追加します。# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
クエリ データ
OLTP データをデータ フレームに読み込み、データに対して一般的なクエリを実行します。 さまざまな構文を使って、データのフィルターまたはクエリを実行できます。
spark.read
を使って、OLTP データをデータフレーム オブジェクトに読み込みます。 このチュートリアルで前に使ったのと同じ構成を使います。 また、spark.cosmos.read.inferSchema.enabled
を true に設定して、Spark コネクタが既存の項目をサンプリングしてスキーマを推論できるようにします。# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
printSchema
を使って、データフレームに読み込まれたデータのスキーマをレンダリングします。# Render schema df.printSchema()
// Render schema df.printSchema()
quantity
列が20
未満のデータ行をレンダリングします。 このクエリを実行するには、where
とshow
関数を使います。# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
clearance
列が true である最初のデータ行をレンダリングします。 このクエリを実行するには、filter
関数を使います。# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
フィルターまたは切り詰めなしで、5 行のデータをレンダリングします。
show
関数を使って、外観とレンダリングされる行数をカスタマイズします。# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
次の生の NoSQL クエリ文字列を使ってデータのクエリを実行します:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
一般的な操作を実行する
Spark で API for NoSQL データを使用するときは、生の JSON として部分的な更新を実行したりデータを操作したりできます。
項目の部分的な更新を実行するには、次の手順のようにします。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、書き込み戦略をItemPatch
に構成し、一括サポートを無効にし、列とマップされた操作を設定し、最後に既定の操作の種類をSet
に設定します。# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
このパッチ操作の一部としてターゲットにする項目のパーティション キーと一意識別子のための変数を作成します。
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
ターゲット項目を指定し、変更する必要があるフィールドを指定するための、パッチ オブジェクトのセットを作成します。
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
パッチ オブジェクトのセットを使ってデータ フレームを作成し、
write
を使ってパッチ操作を実行します。# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
クエリを実行して、パッチ操作の結果を確認します。 これで、項目は
Yamba New Surfboard
という名前になり、それ以外は変更されていないはずです。# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
生の JSON データを操作するには、次の手順のようにします。
既存の
config
構成変数をコピーし、新しいコピーでプロパティを変更します。 具体的には、ターゲット コンテナーをemployees
に変更し、生の JSON データを使うようにcontacts
列とフィールドを構成します。# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
コンテナーに取り込むための一連の従業員を作成します。
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
データ フレームを作成し、
write
を使って従業員データを取り込みます。# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
show
を使ってデータ フレームからデータをレンダリングします。contacts
列が出力で生の JSON であることを確認します。# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
関連するコンテンツ
- Apache Spark
- Azure Cosmos DB Catalog API
- 構成パラメーター参照
- サンプルの "ニューヨーク市タクシー データ" ノートブック
- Spark 2.4 から Spark 3.* に移行する
- バージョンの互換性
- リリース ノート
- ダウンロード リンク