你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Azure Synapse Spark 的 Common Data Model (CDM) 连接器

Synapse Spark Common Data Model (CDM) 格式读取器/写入器使 Spark 程序能够通过 Spark 数据帧在 CDM 文件夹中读取和写入 CDM 实体。

有关使用 CDM 1.0 定义 CDM 文档的信息,请参阅 什么是 CDM 以及如何使用它

高级功能

以下功能受支持:

  • 将数据从 CDM 文件夹中的实体读入到 Spark 数据帧中
  • 基于 CDM 实体定义从 Spark 数据帧写入到 CDM 文件夹中的实体
  • 基于数据帧架构从 Spark 数据帧写入到 CDM 文件夹中的实体

功能

  • 支持读取和写入到启用了 HNS 的 ADLS gen2 中的 CDM 文件夹。
  • 支持从清单或 model.json 文件描述的 CDM 文件夹进行读取。
  • 支持写入到清单文件描述的 CDM 文件夹。
  • 支持带有用户可选分隔符字符的 CSV 格式的数据,带或不带列标题均可。
  • 支持 Apache Parquet 格式的数据,包括嵌套的 Parquet。
  • 支持在读取时使用子清单,以及在写入时选择使用实体范围内的子清单。
  • 支持使用用户可修改的分区模式写入数据。
  • 支持使用托管标识 Synapse 和凭据。
  • 支持使用 config.json 中所述的 CDM 适配器定义解析导入中使用的 CDM 别名位置。

限制

不支持以下方案:

  • 读取实体后,以编程方式访问实体元数据。
  • 写入实体时,以编程方式进行访问来设置或替代元数据。
  • 架构偏差 - 要写入的数据帧中的数据包含实体定义中未包含的额外属性。
  • 架构演变 - 实体分区引用不同版本的实体定义
  • 不支持对 model.json 的写入支持。
  • 执行 com.microsoft.cdm.BuildInfo.version 将验证版本

支持 Spark 2.4 和 Spark 3.1。

读取数据

在读取数据时,连接器根据指定实体的已解析实体定义(已在清单中引用),使用 CDM 文件夹中的元数据创建数据帧。 实体属性名称用作数据帧列名称。 属性数据类型将映射到列数据类型。 加载数据帧时,它会从清单中标识的实体分区内填充。

连接器在指定的清单以及任何一级子清单中查找指定的实体。 如果所需的实体在二级或更低的子清单中,或者如果在不同的子清单中有多个同名的实体,则用户应指定包含所需实体的子清单,而非根清单。 实体分区可以采用混合格式(CSV、Parquet 等)。 清单中标识的所有实体数据文件将合并到一个数据集中(而无论格式如何)并加载到数据帧中。

读取 CSV 数据时,连接器默认使用 Spark FAILFAST 选项。 如果列数与实体中的属性数目不相等,则连接器将返回错误。 此外,从 0.19 开始,许可模式现已受支持。 只有 CSV 文件支持此模式。 使用许可模式时,如果 CSV 行的列数少于实体架构列数,将为缺少的列指定 null 值。 如果 CSV 行的列数多于实体架构的列数,则超出实体架构列计数的列将被截去,使得 CSV 行的列数等于架构列计数。 用法如下:

  .option("entity", "permissive") or .option("mode", "failfast")

例如,此处是一个 python 示例。

写入数据

写入到 CDM 文件夹时,如果 CDM 文件夹中尚不存在该实体,则会创建一个新的实体和定义并将其添加到 CDM 文件夹中,并且在清单中对其进行引用。 支持两种写入模式:

显式写入:物理实体定义基于用户指定的逻辑 CDM 实体定义。

  • 指定的逻辑实体定义将被读取并解析,以创建在 CDM 文件夹中使用的物理实体定义。 如果任何直接或间接引用的 CDM 定义文件中的 import 语句包含别名,则必须提供用于将这些别名映射到 CDM 适配器和存储位置的 config.json 文件。 有关使用别名的详细信息,请参阅下面的别名和适配器配置
  • 如果数据帧架构与引用的实体定义不匹配,则会返回错误。 确保数据帧中的列数据类型与实体中的属性数据类型匹配,包括通过 CDM 中的特征设置的十进制数据、精度和小数位数。
  • 如果数据帧与实体定义不一致,将返回错误。
  • 如果数据帧一致:
    • 如果该实体已存在于清单中,则将根据 CDM 文件夹中的定义解析并验证提供的实体定义。 如果定义不匹配,则会返回错误,否则会写入数据,并更新清单中的分区信息
      • 如果该实体不存在于 CDM 文件夹中,则会将该实体定义的已解析副本写入到 CDM 文件夹中的清单并写入数据,并且更新清单中的分区信息。

隐式写入:实体定义派生自数据帧结构。

  • 如果实体不存在于 CDM 文件夹中,则会使用隐式定义在目标 CDM 文件夹中创建已解析的实体定义。
  • 如果实体存在于 CDM 文件夹中,则将根据现有实体定义验证隐式定义。 如果定义不匹配,则将返回错误,否则会写入数据,并会将派生的逻辑实体定义写入到实体文件夹的子文件夹中。 数据将写入到实体子文件夹内的数据文件夹中。 保存模式将确定是将新数据覆盖现有数据或追加到现有数据之后,还是将返回错误(如果数据存在)。 如果数据已存在,则默认会返回错误。

CDM 别名集成

CDM 定义文件在 import 语句中使用别名来简化 import 语句,并使得所导入内容的位置可以稍后在执行时绑定。 使用别名可以:

  • 利于简化 CDM 文件的组织,以便将位于不同位置的相关 CDM 定义组合到一起。
  • 允许在运行时从不同的已部署位置访问 CDM 内容。

下面的代码片段演示了如何在 CDM 定义文件中的 import 语句内使用别名。

"imports": [  
{     
  "corpusPath": "cdm:/foundations.cdm.json"
},  
{       
  "corpusPath": "core:/TrackedEntity.cdm.json"  
},  
{      
  "corpusPath": "Customer.cdm.json"  
} 
]

在上面的示例中,“cdm”用作 CDM 基础文件位置的别名,“core”用作 TrackedEntity 定义文件位置的别名。

别名是与 CDM config.json 文件中适配器条目内的命名空间值相匹配的文本标签。 适配器条目指定适配器类型(例如 “adls”、“CDN”、“GitHub”、“local”等)以及定义某个位置的 URL。 某些适配器支持其他配置选项,例如连接超时。 尽管别名为任意文本标签,但“cdm”别名是以一种特殊方式处理的,如下所述。

Spark CDM 连接器将在实体定义模型根位置查找要加载的 config.json 文件。 如果 config.json 文件位于其他某个位置,或用户试图替代模型根中的 config.json 文件,则用户可以使用 configPath 选项提供 config.json 文件的位置。 config.json 文件必须包含要解析的 CDM 代码中使用的所有别名的适配器条目,否则将报告错误。

由于能够替代 config.json,用户可以为 CDM 定义提供运行时可访问的位置。 请确保在运行时引用的内容与最初创作 CDM 时使用的定义一致。

按照约定,cdm 别名用于引用根级别标准 CDM 定义(包括 foundations.cdm.json 文件,该文件中包含 CDM 基元数据类型和大多数 cdm 实体定义所需的一系列核心特征定义)的位置。 在 config.json 文件中使用适配器条目,可以像解析任何其他别名一样解析 cdm 别名。 此外,如果未指定适配器或提供了空条目,则默认情况下会将 cdm 别名解析为 https://cdm-schema.microsoft.com/logical/ 上的 CDM 公共 CDN。 用户还可以使用 cdmSource 选项替代 cdm 别名的解析方式(请参阅下面的选项详细信息)。 如果 cdm 别名是要解析的 CDM 定义中使用的唯一别名,则使用 cdmsource 选项非常有用,因为这样便可以无需创建或引用 config.json 文件。

参数、选项和保存模式

对于读取和写入,Spark CDM 连接器库名称是作为参数提供的。 一组选项用于将连接器的行为参数化。 写入时,还会支持一个保存模式。

连接器库名称、选项和保存模式的格式如下:

  • dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
  • dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.<saveMode>)

下面是一个有关如何使用连接器进行读取的示例,其中显示了一些选项。 稍后会提供更多示例。

val readDf = spark.read.format("com.microsoft.cdm")
  .option("storage", "mystorageaccount.dfs.core.windows.net")
  .option("manifestPath", "customerleads/default.manifest.cdm.json")
  .option("entity", "Customer")
  .load()

常见的读取和写入选项

以下选项用于标识 CDM 文件夹中要读取或写入的实体。

选项 说明 模式和示例用法
存储 CDM 文件夹所在的启用了 HNS 的 ADLS gen2 存储帐户的终结点 URL。
使用 dfs.core.windows.net URL
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net"
manifestPath 存储帐户中清单或 model.json 文件的相对路径。 对于读取,该选项可以是根清单、子清单或 model.json。 对于写入,该选项必须是根清单。 <container>/{<folderPath>/}<manifestFileName>,
"mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json"
"models/hr/employees/model.json"(只读)
实体 清单中源或目标实体的名称。 首次在文件夹中写入实体时,将为解析的实体定义指定此名称。 实体名称区分大小写。 <entityName>
"customer"
maxCDMThreads 解析实体定义时的最大并发读取数。 任何有效的整数。 例如 - 5

注意

读取时,不再需要在指定 CDM 文件夹中的物理实体定义之外还指定逻辑实体定义。

显式写入选项

以下选项标识了用于定义要写入的实体的逻辑实体定义。 逻辑实体定义将解析为用于定义实体写入方式的物理定义。

选项 说明 模式/示例用法
entityDefinitionStorage 包含实体定义的 ADLS gen2 存储帐户。 如果与托管 CDM 文件夹的存储帐户不同,则该选项是必需的。 <accountName>.dfs.core.windows.net
"myAccount.dfs.core.windows.net"
entityDefinitionModelRoot 模型根或语料库在帐户中的位置。 <container>/<folderPath>
"crm/core"
entityDefinitionPath 实体的位置。 相对于模型根的 CDM 定义文件的文件路径,包括该文件中的实体的名称。 <folderPath>/<entityName>.cdm.json/<entityName>
"sales/customer.cdm.json/customer"
configPath config.json 文件的容器和文件夹路径,该文件包含实体定义文件及任何直接或间接引用的 CDM 文件中包含的所有别名的适配器配置。 如果 config.json 位于模型根文件夹中,则该选项不是必需的。 <container><folderPath>
useCdmStandardModelRoot 指示模型根位于 https://cdm-schema.microsoft.com/CDM/logical/
用于引用 CDM 内容分发网络 (CDN) 中定义的实体类型。
替代项:entityDefinitionStorage、entityDefinitionModelRoot(如果已指定)。
"useCdmStandardModelRoot"
cdmSource 定义如何解析“cdm”别名(如果存在于 CDM 定义文件中)。 如果使用此选项,它将替代 config.json 文件中指定的任何 cdm 适配器。 值为“builtin”或“referenced”。 默认值为“referenced”
如果该选项设置为 referenced,则会使用 https://cdm-schema.microsoft.com/logical/ 上最新发布的标准 CDM 定义。 如果该选项设置为 builtin,则将使用 Spark CDM 连接器使用的 CDM 对象模型中内置的 CDM 基定义。
注意:
1)。 Spark CDM 连接器可能未在使用最新的 CDM SDK,因此可能不包含最新发布的标准定义。
第 2 期)。 内置定义仅包括顶级 CDM 内容,例如 foundations.cdm.json、primitives.cdm.json 等。如果要使用较低级别的标准 CDM 定义,请使用 referenced 或在 config.json 中包括 cdm 适配器。
"builtin"|"referenced"。

在以上示例中,客户实体定义对象的完整路径为:https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customer,其中“models”是 ADLS 中的容器。

显式写入选项

如果未在写入时指定逻辑实体定义,则将基于数据帧架构隐式写入实体。

隐式写入时,时间戳列通常被解释为 CDM DateTime 数据类型。 这可被替代以创建 CDM Time 数据类型的属性,具体方式是提供与指定该数据类型的列关联的元数据对象。 有关详细信息,请参阅下面的“处理 CDM Time 数据”。

最初,仅 CSV 文件支持此功能。 在更高版本中将添加对将 Time 数据写入到 Parquet 的支持。

文件夹结构和数据格式选项

可以使用以下选项更改文件夹组织和文件格式。

选项 说明 模式/示例用法
useSubManifest 如果此选项为 true,则会导致通过子清单将目标实体包含在根清单中。 子清单和实体定义将写入到根下的一个实体文件夹中。 默认值为 false。 "true"|"false"
format 定义文件格式。 当前支持的文件格式为 CSV 和 Parquet。 默认值为“csv” "csv"|"parquet"
delimiter 仅 CSV。 定义使用的分隔符。 默认值为逗号。 "|"
columnHeaders 仅 CSV。 如果此选项为 true,则会将包含列标题的第一行添加到数据文件。 默认值为“true” "true"|"false"
compression 仅写入。 仅限 Parquet。 定义使用的压缩格式。 默认值为“snappy” "uncompressed" | "snappy" | "gzip" | "lzo"。
dataFolderFormat 允许在实体文件夹内使用用户定义的数据文件夹结构。 允许使用要通过 DateTimeFormatter 格式设置替换到文件夹名称中的日期和时间值。 非格式化程序内容必须用单引号引起来。 默认格式是 "yyyy"-"MM"-"dd" 生成的文件夹名称,如 2020-07-30 year "yyyy" / month "MM"
"Data"

保存模式

保存模式指定在写入数据帧时如何处理 CDM 文件夹中的现有实体数据。 选项包括覆盖、追加,或者在数据已存在时引发错误。 默认的保存模式为 ErrorIfExists

模式 说明
SaveMode.Overwrite 如果现有实体定义发生更改,将覆盖现有的实体定义,并将现有数据分区替换为要写入的数据分区。
SaveMode.Append 会将要写入的数据追加到现有分区旁边的新分区中。
注意:追加不支持更改架构;如果要写入的数据的架构与现有实体定义不兼容,则会引发错误。
SaveMode.ErrorIfExists 如果分区已存在,则将返回错误。

请参阅下面的文件夹和文件组织,详细了解如何在写入时对数据文件进行命名和组织。

身份验证

有以下三种身份验证模式可与 Spark CDM 连接器配合使用,用来读取/写入 CDM 元数据和数据分区:凭据直通、SasToken 和应用注册。

凭据直通

在 Synapse 中,Spark CDM 连接器支持使用适用于 Azure 资源的托管标识来居间促成对包含 CDM 文件夹的 Azure 数据湖存储帐户的访问。 托管标识是为每个 Synapse 工作区自动创建的。 连接器使用工作区的托管标识,该工作区包含一个笔记本,其中调用了连接器以便向要访问的存储帐户进行身份验证。

必须确保向使用的标识授予对相应存储帐户的访问权限。 授予“存储 blob 数据参与者”以允许库写入到 CDM 文件夹,或授予“存储 blob 数据读取者”以只允许读取访问。 在这两种情况下,均不需要额外的连接器选项。

SAS 令牌访问控制选项

针对存储帐户的 SaS 令牌凭据身份验证是用于向存储进行身份验证的额外选项。 使用 SAS 令牌身份验证时,SaS 令牌可以位于容器或文件夹级别。 需要相应的权限(读取/写入)– 读取清单/分区只需要读取级支持,而写入则要求读写支持。

选项 说明 模式和示例用法
sasToken 用于使用正确权限访问相对 storageAccount 的 sastoken <token>

基于凭据的访问控制选项

作为使用托管标识或用户标识的替代方法,可以提供显式凭据来使 Spark CDM 连接器可以访问数据。 在 Azure Active Directory 中,创建一个应用注册,然后使用以下任一角色向此应用注册授予对存储帐户的访问权限:“存储 Blob 数据参与者”角色(用于允许库写入到 CDM 文件夹)或“存储 Blob 数据读取者”角色(用于仅允许读取)。

创建权限后,可以在每次调用连接器时使用以下选项将应用 ID、应用密钥和租户 ID 传递到该连接器。 建议使用 Azure 密钥保管库来保护这些值,以确保它们不会以明文形式存储在笔记本文件中。

选项 说明 模式和示例用法
appId 用于向存储帐户进行身份验证的应用注册 ID <guid>
appKey 已注册的应用密钥或机密 <加密的机密>
tenantId 用于注册应用的 Azure Active Directory 租户 ID。 <guid>

示例

下面的示例均使用已在代码中基于一个 Azure 应用注册进行了初始化的 appId、appKey 和 tenantId 变量,该 Azure 应用注册已被授予存储上的“存储 Blob 数据参与者”权限(用于写入)和“存储 Blob 数据读取者”权限(用于读取)。

读取

此代码从 CDM 文件夹(其中的 mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json 内有清单)读取 Person 实体。

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

隐式写入 – 只使用数据帧架构

此代码将数据帧 df 写入到 CDM 文件夹(其中有一个包含 Event 实体的清单位于 mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json)。

Event 数据将作为追加到文件夹中的 parquet 文件(已通过 gzip 进行了压缩)写入(新文件会被添加,但不会删除现有文件)。


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

显式写入 - 使用存储在 ADLS 中的实体定义

此代码将数据帧 df 写入到 CDM 文件夹(其中有一个包含 Person 实体的清单位于 https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json)。 Person 数据将作为新的 CSV 文件写入(在默认情况下),这些文件将覆盖文件夹中的现有文件。 Person 实体定义是从 https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json 检索的

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

显式写入 - 使用 CDM GitHub 中定义的实体

此代码会将数据帧 df 写入到 CDM 文件夹(其中有一个清单位于 https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json,并且有一个包含 TeamMembership 实体的子清单,该子清单是在TeamMembership 子目录中创建的)。 TeamMembership 数据将写入到 CSV 文件(默认值),这些 CSV 文件将覆盖任何现有数据文件。 TeamMembership 实体定义是从位于以下网址的 CDM CDN 检索的:https://cdm-schema.microsoft.com/logical/core/applicationCommon/TeamMembership.cdm.json

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/Tea
mMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

其他注意事项

Spark 到 CDM 的数据类型映射

向/从 Spark 转换 CDM 时,将应用以下数据类型映射。

Spark CDM
ShortType SmallInteger
IntegerType 整数
LongType BigInteger
DateType Date
时间戳 DateTime(可以选择使用 Time,请参阅下文)
StringType 字符串
DoubleType Double
DecimalType(x,y) Decimal (x,y)(默认的小数位数和精度是 18,4)
FloatType Float
BooleanType 布尔值
ByteType Byte

不支持 CDM Binary 数据类型。

处理 CDM Date、DateTime 和 DateTimeOffset 数据

CDM Date 和 DateTime 数据类型值对于 Spark 和 Parquet 而言是照常处理的,在 CSV 中以 ISO 8601 格式读取/写入。

CDM DateTime 数据类型值会解释为 UTC,在 CSV 中以 ISO 8601 格式写入,例如 2020-03-13 09:49:00Z。

在 Spark 和 Parquet 中,用于记录本地时间瞬间的 CDM DateTimeOffset 值是以与 CSV 不同的方式处理的。 虽然 CSV 和其他格式可以将本地时间瞬间表示为一种结构(由日期时间和 UTC 偏移量组成,格式为 CSV 格式,例如 2020-03-13 09:49:00-08:00),但 Parquet 和 Spark 不支持这样的结构。 相反,它们使用 TIMESTAMP 数据类型,该数据类型允许以 UTC 时间格式(或在某个未指定的时区中)记录瞬间。

Spark CDM 连接器会将 CSV 中的 DateTimeOffset 值转换为 UTC 时间戳。 这将在 parquet 中保留为 Timestamp,如果随后保留为 CSV,则该值将序列化为 DateTimeOffset,其偏移量为 +00:00。 重要的是,时间的准确度无损 - 尽管偏移量丢失,但序列化值表示与原始值相同的瞬间。 Spark 系统使用其系统时间作为基线,通常使用本地时间表示时间。 始终可以通过应用本地系统偏移量来计算 UTC 时间。 对于所有区域的 Azure 系统,系统时间始终为 UTC,因此所有时间戳值通常都采用 UTC 格式。

由于 Azure 系统值始终为 UTC,因此在使用隐式写入(其中 CDM 定义派生自数据帧)时,时间戳列将转换为具有 CDM DateTime 数据类型的属性,该数据类型意味着 UTC 时间。

如果保留本地时间很重要,并且数据将在 Spark 中处理或以 parquet 格式保留,则建议使用 DateTime 属性,并在单独的属性中保留偏移量,例如,作为表示分钟数的带符号整数值。 在 CDM 中,DateTime 值为 UTC,因此,当需要使用偏移量计算本地时间时,必须应用偏移量。

在大多数情况下,保留本地时间并不重要。 通常仅在 UI 中需要本地时间以方便用户使用,并且本地时间基于用户的时区,因此不存储 UTC 时间通常是更好的解决方案。

处理 CDM 时间数据

Spark 不支持显式 Time 数据类型。 具有 CDM Time 数据类型的属性在 Spark 数据帧中表示为具有 Timestamp 数据类型的列。 读取时间值时,数据帧中的时间戳将使用 Spark epoch 日期 01/01/1970 加上从源读取的时间值进行初始化。

使用显式写入时,时间戳列可以映射到 DateTime 或 Time 属性。 如果时间戳映射到 Time 属性,时间戳的日期部分将被去除。

使用隐式写入时,时间戳列默认映射到 DateTime 属性。 若要将时间戳列映射到 Time 属性,必须将元数据对象添加到数据帧中的列,以指示时间戳应解释为时间值。 下面的代码演示如何在 Scala 中完成此过程。

val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))

时间值准确度

基于要读取的文件(CSV 或 Parquet)中的数据格式或数据帧中定义的数据格式,Spark CDM 连接器支持 DateTime 或 Time 数据类型的时间值(秒数最多具有 6 个小数位),从而实现从单秒到微秒的准确度。

文件夹和文件命名与组织

写入 CDM 文件夹时会使用下面所示的默认文件夹组织。

默认情况下,数据文件将写入到为当前日期创建的文件夹,例如,名为“2010-07-31”的文件夹。 可以使用 dateFolderFormat 选项自定义文件夹结构和名称,如前文所述。

数据文件名称基于以下模式:<entity>-<jobid>-*.<fileformat>。

可以使用 sparkContext.parallelize() 方法控制写入的数据分区数。 分区数由 Spark 群集中的执行程序数确定,也可以显式指定。 下面的 Scala 示例创建包含两个分区的数据帧。

val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)

显式写入(由引用的实体定义来定义)

+-- <CDMFolder>
     |-- default.manifest.cdm.json     << with entity ref and partition info
     +-- <Entity>
          |-- <entity>.cdm.json        << resolved physical entity definition
          |-- <data folder>
          |-- <data folder>
          +-- ...                            

使用子清单进行的显式写入:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to sub-manifest
    +-- <Entity>
         |-- <entity>.cdm.json
         |-- <entity>.manifest.cdm.json << sub-manifest with partition info
         |-- <data folder>
         |-- <data folder>
         +-- ...

隐式(实体定义派生自数据帧架构):

+-- <CDMFolder>
    |-- default.manifest.cdm.json
    +-- <Entity>
         |-- <entity>.cdm.json          << resolved physical entity definition
         +-- LogicalDefinition
         |   +-- <entity>.cdm.json      << logical entity definition(s)
         |-- <data folder>
         |-- <data folder>
         +-- ...

使用子清单进行的隐式写入:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to sub-manifest
    +-- <Entity>
        |-- <entity>.cdm.json           << resolved physical entity definition
        |-- <entity>.manifest.cdm.json  << sub-manifest with reference to the entity and partition info
        +-- LogicalDefinition
        |   +-- <entity>.cdm.json       << logical entity definition(s)
        |-- <data folder>
        |-- <data folder>
        +-- ...

示例

请参阅 https://github.com/Azure/spark-cdm-connector/tree/master/samples 以获取示例代码和 CDM 文件。

示例

下面的示例均使用已在代码中基于一个 Azure 应用注册进行了初始化的 appId、appKey 和 tenantId 变量,该 Azure 应用注册已被授予存储上的“存储 Blob 数据参与者”权限(用于写入)和“存储 Blob 数据读取者”权限(用于读取)。

读取

此代码从 CDM 文件夹(其中的 mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json 内有清单)读取 Person 实体。

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

隐式写入 – 只使用数据帧架构

此代码将数据帧 df 写入到 CDM 文件夹(其中有一个包含 Event 实体的清单位于 mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json)。

Event 数据将作为追加到文件夹中的 Parquet 文件(已通过 gzip 进行了压缩)写入(新文件会被添加,而不会删除现有文件)。


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

显式写入 - 使用存储在 ADLS 中的实体定义

此代码将数据帧 df 写入到 CDM 文件夹(其中有一个包含 Person 实体的清单位于 https://mystorage.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json)。 Person 数据将作为新的 CSV 文件写入(在默认情况下),这些文件将覆盖文件夹中的现有文件。 Person 实体定义是从 https://mystorage.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json 检索的

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

显式写入 - 使用 CDM GitHub 中定义的实体

此代码会将数据帧 df 写入到 CDM 文件夹(其中有一个清单位于 https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json,并且有一个包含 TeamMembership 实体的子清单,该子清单是在TeamMembership 子目录中创建的)。 TeamMembership 数据将写入到 CSV 文件(默认值),这些 CSV 文件将覆盖任何现有数据文件。 TeamMembership 实体定义是从位于以下网址的 CDM CDN 检索的:https://cdm-schema.microsoft.com/logical/core/applicationCommon/TeamMembership.cdm.json

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/Tea
mMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

其他注意事项

Spark 到 CDM 的数据类型映射

向/从 Spark 转换 CDM 时,将应用以下数据类型映射。

Spark CDM
ShortType SmallInteger
IntegerType 整数
LongType BigInteger
DateType Date
时间戳 DateTime(可以选择使用 Time,请参阅下文)
StringType 字符串
DoubleType Double
DecimalType(x,y) Decimal (x,y)(默认的小数位数和精度是 18,4)
FloatType Float
BooleanType 布尔值
ByteType Byte

不支持 CDM Binary 数据类型。

故障排除和已知问题

  • 请确保数据帧中使用的十进制数据类型字段的十进制精度和小数位数与 CDM 实体定义中使用的数据类型匹配 - 需要在该数据类型上定义精度和小数位数特征。 如果未在 CDM 中显式定义精度和小数位数,则使用的默认值为 Decimal(18,4)。 对于 model.json 文件,Decimal 假定为 Decimal(18,4)。
  • 以下选项中的文件夹和文件名不应包含空格或特殊字符(例如"="):manifestPath、entityDefinitionModelRoot、entityDefinitionPath、dataFolderFormat。

不支持的功能

尚不支持以下功能:

  • 最初仅对 CSV 文件支持替代要解释为 CDM Time(而不是 DateTime)的时间戳列。 在更高版本中将添加对将 Time 数据写入到 Parquet 的支持。
  • Parquet Map 类型、基元类型的数组和数组类型的数组当前不受 CDM 支持,因此也不受 Spark CDM 连接器支持。

后续步骤

现在可以查看其他 Apache Spark 连接器: