你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

快速入门:使用适用于 SQL API 的 Azure Cosmos DB Spark 3 OLTP 连接器管理数据

适用于: SQL API

本教程是一个快速入门指南,介绍如何使用 Cosmos DB Spark 连接器从 Cosmos DB 中读取或向其写入。 Cosmos DB Spark 连接器支持 Spark 3.1.x 和 3.2.x。

在本快速教程中,我们将依赖包含 Spark 3.2.1 的 Azure Databricks Runtime 10.4 和 Jupyter Notebook 来演示如何使用 Cosmos DB Spark 连接器。

你也可以使用任何其他 Spark(例如,spark 3.1.1)产品/服务,而且还应该能够使用 Spark 支持的任何语言(PySpark、Scala、Java 等),或者你熟悉的任何 Spark 界面(Jupyter Notebook、Livy 等)。

先决条件

仅当计划使用日志记录时,才需要 SLF4J。另请下载 SLF4J 绑定,该绑定可将 SLF4J API 与你选择的记录实现链接在一起。 有关详细信息,请参阅 SLF4J 用户手册

使用 Spark 3.2.x 的最新版本在 Spark 群集中安装 Cosmos DB Spark 连接器。

本入门指南基于 PySpark/Scala,你可以在 Azure Databricks PySpark/Scala 笔记本中运行以下代码片段。

创建数据库和容器

首先设置 Cosmos DB 帐户凭据,以及 Cosmos DB 数据库名称和容器名称。

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

接下来,可以使用新的目录 API 通过 Spark 创建 Cosmos DB 数据库和容器。

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

使用目录 API 创建容器时,可为要创建的容器设置吞吐量和分区键路径

有关详细信息,请参阅完整的目录 API 文档。

引入数据

数据源的名称为 cosmos.oltp,以下示例演示如何在 Cosmos DB 中写入由两个项构成的内存数据帧:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

请注意,id 是 Cosmos DB 的必填字段。

有关引入数据的详细信息,请参阅完整的写入配置文档。

查询数据

可以使用同一 cosmos.oltp 数据源查询数据,并使用 filter 来下推筛选器:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

有关查询数据的详细信息,请参阅完整的查询配置文档。

使用修补程序进行部分文档更新

使用同一 cosmos.oltp 数据源时,可以使用修补程序 API 在 Cosmos DB 中执行部分更新:

cfgPatch = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
          "spark.cosmos.accountKey": cosmosMasterKey,
          "spark.cosmos.database": cosmosDatabaseName,
          "spark.cosmos.container": cosmosContainerName,
          "spark.cosmos.write.strategy": "ItemPatch",
          "spark.cosmos.write.bulk.enabled": "false",
          "spark.cosmos.write.patch.defaultOperationType": "Set",
          "spark.cosmos.write.patch.columnConfigs": "[col(name).op(set)]"
          }

id = "<document-id>"
query = "select * from cosmosCatalog.{}.{} where id = '{}';".format(
    cosmosDatabaseName, cosmosContainerName, id)

dfBeforePatch = spark.sql(query)
print("document before patch operation")
dfBeforePatch.show()

data = [{"id": id, "name": "Joel Brakus"}]
patchDf = spark.createDataFrame(data)

patchDf.write.format("cosmos.oltp").mode("Append").options(**cfgPatch).save()

dfAfterPatch = spark.sql(query)
print("document after patch operation")
dfAfterPatch.show()

有关与部分文档更新相关的更多示例,请参阅 Github 代码示例修补程序示例

架构推理

查询数据时,Spark 连接器可以通过将 spark.cosmos.read.inferSchema.enabled 设置为 true,基于对现有项的采样来推理架构。

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()


# Alternatively, you can pass the custom schema you want to be used to read the data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

# If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

有关架构推理的详细信息,请参阅完整的架构推理配置文档。

配置参考

适用于 SQL API 的 Azure Cosmos DB Spark 3 OLTP 连接器有一个完整的配置引用,该引用提供额外的高级设置,用于写入和查询数据、序列化、使用更改源进行流式处理、分区和吞吐量管理,等等。 如需包含详细信息的完整列表,请参阅 GitHub 上的 Spark 连接器配置参考

迁移到 Spark 3 连接器

如果你使用的是较旧的 Spark 2.4 连接器,可以在此处了解如何迁移到 Spark 3 连接器。

后续步骤