Azure Cosmos DB Spark コネクタを使用してデータを移動する

完了

Azure Synapse AnalyticsAzure Synapse Link for Azure Cosmos DB を使用すると、クラウドネイティブのハイブリッド トランザクションと分析処理 (HTAP) を作成して、Azure Cosmos DB for NoSQL のデータに対して分析を実行できます。 この接続により、データ環境の両端 (Azure Cosmos DB と Azure Synapse Analytics) をデータ パイプラインによって統合できるようになります。

セットアップ

まず、アカウント レベルで Synapse Link が有効になっている必要があります。 これは、Azure portal または Azure CLI を使用して実行できます。

az cosmosdb create --name <name> --resource-group <resource-group> --enable-analytical-storage true

また、Azure PowerShell を使用することもできます。

New-AzCosmosDBAccount -ResourceGroupName <resource-group> -Name <name>  -Location <location> -EnableAnalyticalStorage true

コンテナーを作成する場合は、コンテナーごとにコンテナー レベルで分析ストレージを有効にする必要があります。 これはポータルを使用して実行できます。

CLI を使用して実行することもできます。

az cosmosdb sql container create --resource-group <resource-group> --account <account> --database <database> --name <name> --partition-key-path <partition-key-path> --throughput <throughput> --analytical-storage-ttl -1

または、Azure PowerShell を使用して実行できます。

New-AzCosmosDBSqlContainer -ResourceGroupName <resource-group> -AccountName <account> -DatabaseName <database> -Name <name> -PartitionKeyPath <partition-key-path> -Throughput <throughput> -AnalyticalStorageTtl -1

ヒント

また、さまざまな開発者 SDK を使用して、コンテナーごとのレベルで分析ストレージを、またはアカウント レベルで Synapse Link を有効または無効にすることもできます。

Azure Cosmos DB から読み込む

次の 2 つの Python の例は、Azure Synapse Analytics ワークスペース内で実行する必要があります。

Azure Cosmos DB for NoSQL からデータをクエリするには、2 つのオプションがあります。 まず、メタデータがキャッシュされている Spark DataFrame に読み込むことを選択できます。 この例では、Python を使用して、Azure Cosmos DB for NoSQL アカウントを指定する Spark DataFrame を読み込みます。

productsDataFrame = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .load()

または、Azure Cosmos DB for NoSQL を直接ポイントする Spark テーブルを作成することもできます。 その後、基になるストアに影響を与えることなく、Spark テーブルに対して SparkSQL クエリを実行できます。 この例では、Python を使用して Spark テーブルを作成します。

create table products_qry using cosmos.olap options (
    spark.synapse.linkedService 'cosmicworks_serv',
    spark.cosmos.container 'products'
)

Azure Cosmos DB に書き込む

次の 2 つの Python の例は、Azure Synapse Analytics ワークスペース内で実行する必要があります。

Spark DataFrame から Azure Cosmos DB に新しいデータを書き込む場合は、次の Python スクリプトを使用して、DataFrame 内のデータを既存のコンテナーに追加できます。

productsDataFrame.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .mode('append')\
    .save()

この操作により、既存のトランザクション ワークロードが影響を受け、Azure Cosmos DB for NoSQL コンテナー上の要求ユニットが使用されます。

さらに、チェックポイントから始めて DataFrame からデータをストリーミングできます。 また、このサンプル Python スクリプトを使用して、このストリーミング データを既存のコンテナーに追加することもできます。

query = productsDataFrame\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .option("checkpointLocation", "/tmp/runIdentifier/")\
    .outputMode("append")\
    .start()

query.awaitTermination()