Azure Cosmos DB Spark コネクタを使用してデータを移動する
Azure Synapse Analytics と Azure Synapse Link for Azure Cosmos DB を使用すると、クラウドネイティブのハイブリッド トランザクションと分析処理 (HTAP) を作成して、Azure Cosmos DB for NoSQL のデータに対して分析を実行できます。 この接続により、データ環境の両端 (Azure Cosmos DB と Azure Synapse Analytics) をデータ パイプラインによって統合できるようになります。
セットアップ
まず、アカウント レベルで Synapse Link が有効になっている必要があります。 これは、Azure portal または Azure CLI を使用して実行できます。
az cosmosdb create --name <name> --resource-group <resource-group> --enable-analytical-storage true
また、Azure PowerShell を使用することもできます。
New-AzCosmosDBAccount -ResourceGroupName <resource-group> -Name <name> -Location <location> -EnableAnalyticalStorage true
コンテナーを作成する場合は、コンテナーごとにコンテナー レベルで分析ストレージを有効にする必要があります。 これはポータルを使用して実行できます。
CLI を使用して実行することもできます。
az cosmosdb sql container create --resource-group <resource-group> --account <account> --database <database> --name <name> --partition-key-path <partition-key-path> --throughput <throughput> --analytical-storage-ttl -1
または、Azure PowerShell を使用して実行できます。
New-AzCosmosDBSqlContainer -ResourceGroupName <resource-group> -AccountName <account> -DatabaseName <database> -Name <name> -PartitionKeyPath <partition-key-path> -Throughput <throughput> -AnalyticalStorageTtl -1
ヒント
また、さまざまな開発者 SDK を使用して、コンテナーごとのレベルで分析ストレージを、またはアカウント レベルで Synapse Link を有効または無効にすることもできます。
Azure Cosmos DB から読み込む
注
次の 2 つの Python の例は、Azure Synapse Analytics ワークスペース内で実行する必要があります。
Azure Cosmos DB for NoSQL からデータをクエリするには、2 つのオプションがあります。 まず、メタデータがキャッシュされている Spark DataFrame に読み込むことを選択できます。 この例では、Python を使用して、Azure Cosmos DB for NoSQL アカウントを指定する Spark DataFrame を読み込みます。
productsDataFrame = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "cosmicworks_serv")\
.option("spark.cosmos.container", "products")\
.load()
または、Azure Cosmos DB for NoSQL を直接ポイントする Spark テーブルを作成することもできます。 その後、基になるストアに影響を与えることなく、Spark テーブルに対して SparkSQL クエリを実行できます。 この例では、Python を使用して Spark テーブルを作成します。
create table products_qry using cosmos.olap options (
spark.synapse.linkedService 'cosmicworks_serv',
spark.cosmos.container 'products'
)
Azure Cosmos DB に書き込む
注
次の 2 つの Python の例は、Azure Synapse Analytics ワークスペース内で実行する必要があります。
Spark DataFrame から Azure Cosmos DB に新しいデータを書き込む場合は、次の Python スクリプトを使用して、DataFrame 内のデータを既存のコンテナーに追加できます。
productsDataFrame.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "cosmicworks_serv")\
.option("spark.cosmos.container", "products")\
.mode('append')\
.save()
注
この操作により、既存のトランザクション ワークロードが影響を受け、Azure Cosmos DB for NoSQL コンテナー上の要求ユニットが使用されます。
さらに、チェックポイントから始めて DataFrame からデータをストリーミングできます。 また、このサンプル Python スクリプトを使用して、このストリーミング データを既存のコンテナーに追加することもできます。
query = productsDataFrame\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "cosmicworks_serv")\
.option("spark.cosmos.container", "products")\
.option("checkpointLocation", "/tmp/runIdentifier/")\
.outputMode("append")\
.start()
query.awaitTermination()