你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 Azure Synapse Analytics 的 Spark Common Data Model 连接器

Spark Common Data Model 连接器(Spark CDM 连接器)是 Azure Synapse Analytics 中的格式读取器/编写器。 它支持 Spark 程序通过 Spark DataFrame 在 Common Data Model 文件夹中读取和写入 Common Data Model 实体。

有关使用 Common Data Model 1.2 定义 Common Data Model 文档的信息,请参阅本文来了解什么是 Common Data Model 及其用法

功能

连接器在高级别上支持:

  • 3.1、3.2 和 3.3。
  • 将数据从 Common Data Model 文件夹中的实体读入 Spark DataFrame。
  • 根据 Common Data Model 实体定义,从 Spark DataFrame 写入 Common Data Model 文件夹中的实体。
  • 根据 DataFrame 架构,从 Spark DataFrame 写入 Common Data Model 文件夹中的实体。

连接器还支持:

  • 在启用了分层命名空间 (HNS) 的 Azure Data Lake Storage 中读取和写入 Common Data Model 文件夹。
  • 从清单或 model.json 文件描述的 Common Data Model 文件夹进行读取。
  • 写入清单文件描述的 Common Data Model 文件夹。
  • 带有用户可选分隔符字符的 CSV 格式的数据,带或不带列标题均可。
  • 采用 Apache Parquet 格式的数据,包括嵌套的 Parquet。
  • 在读取时使用子清单,以及在写入时选择使用实体范围内的子清单。
  • 通过用户可修改的分区模式写入数据。
  • 在 Azure Synapse Analytics 中使用托管标识和凭据。
  • 通过 config.json 文件中所述的 Common Data Model 适配器定义解析导入中使用的 Common Data Model 别名位置。

限制

连接器不支持以下功能和方案:

  • 并行写入。 我们不建议使用它们。 存储层中不存在锁定机制。
  • 读取实体后,以编程方式访问实体元数据。
  • 写入实体时,以编程方式进行访问来设置或替代元数据。
  • 架构偏差 - 要写入的 DataFrame 中的数据包含实体定义中未包含的额外属性。
  • 架构演变 - 实体分区引用不同版本的实体定义。 可以通过运行 com.microsoft.cdm.BuildInfo.version 来验证此版本。
  • 编写对 model.json 的支持。
  • Time 数据写入 Parquet。 目前,连接器支持替代时间戳列,以将其解释为 Common Data Model Time 值,而不是仅用于 CSV 文件的 DateTime 值。
  • Parquet Map 类型、基元类型的数组和数组类型的数组。 Common Data Model 当前不支持它们,因此 Spark CDM 连接器也不支持它们。

示例

要开始使用连接器,请检查示例代码和 Common Data Model 文件

读取数据

在连接器读取数据时,会根据指定实体的已解析实体定义(已在清单中引用)使用 Common Data Model 文件夹中的元数据创建 DataFrame。 连接器使用实体属性名称作为 DataFrame 列名称。 它将属性数据类型映射到列数据类型。 在加载 DataFrame 时,它会从清单中标识的实体分区内填充。

连接器在指定的清单以及任何一级子清单中查找指定的实体。 如果所需实体在二级或更低级别的子清单中,或者如果在不同的子清单中存在多个同名的实体,则应指定包含所需实体的子清单,而非根清单。

实体分区可以混合采用多种格式(例如 CSV 和 Parquet)。 无论采用何种格式,清单中标识的所有实体数据文件将合并到一个数据集,并加载到 DataFrame 中。

连接器读取 CSV 数据时,默认使用 Spark failfast 选项。 如果列数与实体中的属性数目不相等,则连接器将返回错误。

或者,从 0.19 起,连接器支持宽容模式(仅限 CSV 文件)。 使用宽容模式时,如果 CSV 行的列数少于实体架构列数,则连接器将为缺少的列指定 null 值。 如果 CSV 行的列数多于实体架构的列数,则超出实体架构列计数的列将被截断以等于架构列计数。 用法如下:

.option("mode", "permissive") or .option("mode", "failfast")

写入数据

当连接器写入 Common Data Model 文件夹时,如果该实体尚不存在于该文件夹中,连接器将创建新的实体和定义。 它会将实体和定义添加到 Common Data Model 文件夹,并在清单中引用它们。

连接器支持两种写入模式:

  • 显式写入:物理实体定义基于你指定的逻辑 Common Data Model 实体定义。

    连接器读取并解析指定的逻辑实体定义,以创建 Common Data Model 文件夹中使用的物理实体定义。 如果任何直接或间接引用的 Common Data Model 定义文件中的 import 语句包含别名,则必须提供将这些别名映射到 Common Data Model 适配器和存储位置的 config.json 文件。

    • 如果 DataFrame 架构与引用的实体定义不匹配,则连接器会返回错误。 确保 DataFrame 中的列数据类型与实体中的属性数据类型匹配,包括通过 Common Data Model 中的特征设置的十进制数据、精准率和小数位数。
    • 如果 DataFrame 与实体定义不一致,连接器将返回错误。
    • 如果 DataFrame 是一致的:
      • 如果清单中已存在该实体,连接器会解析提供的实体定义,并针对 Common Data Model 文件夹中的定义对其进行验证。 如果定义不匹配,则连接器将返回错误。 否则,连接器会写入数据并更新清单中的分区信息。
      • 如果 Common Data Model 文件夹中不存在该实体,连接器会将实体定义的已解析副本写入到 Common Data Model 文件夹中的清单。 连接器写入数据并更新清单中的分区信息。
  • 隐式写入:实体定义派生自 DataFrame 结构。

    • 如果 Common Data Model 文件夹中不存在该实体,连接器将使用隐式定义在目标 Common Data Model 文件夹中创建已解析的实体定义。

    • 如果 Common Data Model 文件夹中存在该实体,则连接器将根据现有实体定义验证隐式定义。 如果定义不匹配,则连接器将返回错误。 否则,连接器将写入数据,并将派生的逻辑实体定义写入实体文件夹的子文件夹中。

      连接器将数据写入实体子文件夹中的数据文件夹。 保存模式将确定是将新数据覆盖现有数据或追加到现有数据之后,还是将返回错误(如果数据存在)。 如果数据已存在,则默认会返回错误。

Common Data Model 别名集成

Common Data Model 定义文件在 import 语句中使用别名来简化 import 语句,并允许在运行时后期绑定所导入内容的位置。 使用别名可以:

  • 有利于简化 Common Data Model 文件的组织,以便将位于不同位置的相关 Common Data Model 定义组合到一起。
  • 允许在运行时从不同的已部署位置访问 Common Data Model 内容。

下面的代码片段演示了如何在 Common Data Model 定义文件中的 import 语句内使用别名:

"imports": [  
{     
  "corpusPath": "cdm:/foundations.cdm.json"
},  
{       
  "corpusPath": "core:/TrackedEntity.cdm.json"  
},  
{      
  "corpusPath": "Customer.cdm.json"  
} 
]

前面的示例使用 cdm 作为 Common Data Model 基础文件位置的别名。 它使用 core 作为 TrackedEntity 定义文件位置的别名。

别名是与 Common Data Model config.json 文件中适配器条目内的命名空间值相匹配的文本标签。 适配器条目指定适配器类型(例如 adlsCDNGitHublocal)以及定义位置的 URL。 某些适配器支持其他配置选项,例如连接超时。 尽管别名是任意的文本标签,但 cdm 别名是以一种特殊方式处理的。

Spark CDM 连接器将在实体定义的模型根位置查找要加载的 config.json 文件。 如果 config.json 文件位于某个其他位置,或用户想要替代模型根中的 config.json 文件,则可以使用 configPath 选项提供 config.json 文件的位置。 config.json 文件必须包含正在解析的 Common Data Model 代码中使用的所有别名的适配器条目,否则连接器将报告错误。

替代 config.json 文件的功能意味着可以为 Common Data Model 定义提供运行时可访问的位置。 请确保在运行时引用的内容与最初创作 Common Data Model 时使用的定义一致。

按照约定,cdm 别名是指根级别标准 Common Data Model 定义(包括 foundations.cdm.json 文件)的位置。 此文件包括 Common Data Model 基元数据类型和大多数 Common Data Model 实体定义所需的一组核心特征定义。

通过在 config.json 文件中使用适配器条目,可以像解析任何其他别名一样解析 cdm 别名。 如果未指定适配器或提供了 null 条目,则 cdm 别名默认解析为位于 https://cdm-schema.microsoft.com/logical/ 的 Common Data Model 公共内容分发网络 (CDN)。

还可以使用 cdmSource 选项替代 cdm 别名的解析方式。 如果 cdm 别名是要解析的 Common Data Model 定义中使用的唯一别名,则使用 cdmSource 选项会非常有用,因为这样可以无需创建或引用 config.json 文件。

参数、选项和保存模式

对于读取和写入,请提供 Spark CDM 连接器的库名称作为参数。 可使用一组选项将连接器的行为参数化。 编写时,连接器还会支持保存模式。

连接器库名称、选项和保存模式的格式如下:

  • dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
  • dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)

以下示例演示了使用连接器进行读取的一些选项:

val readDf = spark.read.format("com.microsoft.cdm")
  .option("storage", "mystorageaccount.dfs.core.windows.net")
  .option("manifestPath", "customerleads/default.manifest.cdm.json")
  .option("entity", "Customer")
  .load()

常用读取和写入选项

以下选项标识要读取或写入的 Common Data Model 文件夹中的实体。

选项 说明 模式和示例用法
storage 启用了 HNS 的 Azure Data Lake Storage 帐户的终结点 URL,其中包含 Common Data Model 文件夹。
使用 dfs.core.windows.net URL。
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net"
manifestPath 存储帐户中清单或 model.json 文件的相对路径。 对于读取,该选项可以是根清单、子清单或 model.json 文件。 对于写入,则必须是根清单。 <container>/{<folderPath>}<manifestFileName>
"mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json"
"models/hr/employees/model.json"(只读)
entity 清单中源或目标实体的名称。 首次在文件夹中写入实体时,将为解析的实体定义指定此名称。 实体名称区分大小写。 <entityName>
"customer"
maxCDMThreads 连接器解析实体定义时的最大并发读取数。 任何有效的整数,例如 5

注意

读取时,除 Common Data Model 文件夹中的物理实体定义以外,不再需要指定逻辑实体定义。

显式写入选项

以下选项标识了用于要写入的实体的逻辑实体定义。 逻辑实体定义将解析为定义实体写入方式的物理定义。

选项 说明 模式或示例用法
entityDefinitionStorage 包含实体定义的 Azure Data Lake Storage 帐户。 如果它不同于托管 Common Data Model 文件夹的存储帐户,则为必需。 <accountName>.dfs.core.windows.net
"myAccount.dfs.core.windows.net"
entityDefinitionModelRoot 模型根或语料库在帐户中的位置。 <container>/<folderPath>
"crm/core"
entityDefinitionPath 实体的位置。 它是相对于模型根的 Common Data Model 定义文件的文件路径,包括该文件中实体的名称。 <folderPath>/<entityName>.cdm.json/<entityName>
"sales/customer.cdm.json/customer"
configPath config.json 文件的容器和文件夹路径,该文件包含实体定义文件及任何直接或间接引用的 Common Data Model 文件中包含的所有别名的适配器配置。

如果 config.json 位于模型根文件夹中,则此选项不是必需的。
<container><folderPath>
useCdmStandardModelRoot 指示模型根位于 https://cdm-schema.microsoft.com/CDM/logical/。 用于引用 Common Data Model CDN 中定义的实体类型。 替代 entityDefinitionStorageentityDefinitionModelRoot(如果已指定)。
"useCdmStandardModelRoot"
cdmSource 定义如何解析 cdm 别名(如果别名存在于 Common Data Model 定义文件中)。 如果使用此选项,它将替代 config.json 文件中指定的任何 cdm 适配器。 值为 builtinreferenced。 默认值是 referenced

如果将此选项设置为 referenced,连接器将使用 https://cdm-schema.microsoft.com/logical/ 中最新发布的标准 Common Data Model 定义。 如果将此选项设置为 builtin,连接器将使用内置于连接器正在使用的 Common Data Model 对象模型中的 Common Data Model 基本定义。

请注意:
* Spark CDM 连接器可能使用的不是最新的Common Data Model SDK,因此可能不包含最新发布的标准定义。
* 内置定义仅包括顶级 Common Data Model 内容,例如 foundations.cdm.jsonprimitives.cdm.json。 如果要使用较低级别的标准 Common Data Model 定义,请使用 referenced 或在 config.json 包含 cdm 适配器。
"builtin"|"referenced"

在前面的示例中,客户实体定义对象的完整路径是 https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customer。 在该路径中,模型是 Azure Data Lake Storage 中的容器。

显式写入选项

如果在写入时未指定逻辑实体定义,则将基于 DataFrame 架构隐式写入实体。

当隐式写入时,时间戳列通常被解释为 Common Data Model DateTime 数据类型。 可以通过提供与指定数据类型的列关联的元数据对象来替代此解释,以创建 Common Data Model Time 数据类型的属性。 有关详细信息,请参阅本文后面的处理 Common Data Model 时间数据

仅 CSV 文件支持写入时间数据。 该支持目前未扩展到 Parquet。

文件夹结构和数据格式选项

可以使用以下选项更改文件夹组织和文件格式。

选项 说明 模式或示例用法
useSubManifest 如果为 true,则会导致通过子清单将目标实体包含在根清单中。 子清单和实体定义将写入到根下的一个实体文件夹中。 默认值为 false "true"|"false"
format 定义文件格式。 当前支持的文件格式为 CSV 和 Parquet。 默认值为 csv "csv"|"parquet"
delimiter 仅 CSV。 定义正在使用的分隔符。 默认值为逗号。 "|"
columnHeaders 仅 CSV。 如果为 true,则向带有列标题的数据文件添加第一行。 默认值为 true "true"|"false"
compression 仅写入。 仅限 Parquet。 定义正在使用的压缩格式。 默认值为 snappy "uncompressed" | "snappy" | "gzip" | "lzo"
dataFolderFormat 允许在实体文件夹内使用可由用户定义的数据文件夹结构。 允许使用 DateTimeFormatter 格式设置将日期和时间值替换为文件夹名称。 非格式化程序内容必须放在单引号中。 默认格式为 "yyyy"-"MM"-"dd",这会生成文件夹名称,如 2020-07-30 year "yyyy" / month "MM"
"Data"

保存模式

保存模式指定在编写 DataFrame 时连接器如何处理 Common Data Model 文件夹中的现有实体数据。 选项包括覆盖、追加,或在数据已存在时返回错误。 默认的保存模式为 ErrorIfExists

模式 说明
SaveMode.Overwrite 如果它发生更改,则覆盖现有的实体定义,并将现有数据分区替换为要写入的数据分区。
SaveMode.Append 将要写入的数据追加到现有分区旁边的新分区中。

此模式不支持更改架构。 如果正在写入的数据的架构与现有实体定义不兼容,连接器将引发错误。
SaveMode.ErrorIfExists 如果分区已存在,则会返回错误。

有关如何在写入时命名和组织数据文件的详细信息,请参阅本文后面的文件夹与文件命名和组织部分。

身份验证

可以通过 Spark CDM 连接器使用三种身份验证模式来读取或写入 Common Data Model 元数据和数据分区:凭据传递、共享访问签名 (SAS) 令牌以及应用注册。

凭据直通身份验证

在 Azure Synapse Analytics 中,Spark CDM 连接器支持使用 Azure 资源的托管标识来协调对包含 Common Data Model 文件夹的 Azure Data Lake Storage 帐户的访问。 托管标识是为每个 Azure Synapse Analytics 工作区自动创建的。 连接器使用工作区的托管标识,该工作区包含一个笔记本,其中会调用连接器以便向存储帐户进行身份验证。

必须确保所选标识有权访问相应的存储帐户:

  • 授予存储 Blob 数据参与者权限,以允许库写入 Common Data Model 文件夹。
  • 授予存储 Blob 数据读者权限以仅允许读取访问。

在这两种情况下,均不需要额外的连接器选项。

适用于基于 SAS 令牌的访问控制的选项

SAS 令牌凭据是用于向存储帐户进行身份验证的额外选项。 使用 SAS 令牌身份验证时,SAS 令牌可以位于容器或文件夹级别。 需要适当的权限:

  • 清单或分区的读取权限只需要读取级别支持。
  • 写入权限需要读取和写入支持。
选项 说明 模式和示例用法
sasToken 用于使用正确权限访问相对存储帐户的 SAS 令牌 <token>

适用于基于凭据的访问控制的选项

作为使用托管标识或用户标识的替代方法,可以提供显式凭据来支持 Spark CDM 连接器访问数据。 在 Microsoft Entra ID 中创建应用注册。 然后使用以下任一角色授予此应用注册对存储帐户的访问权限:

  • 存储 Blob 数据参与者,可允许库写入 Common Data Model 文件夹
  • 存储 Blob 数据读者,仅允许读取权限

创建权限后,可以使用以下选项在每次调用连接器时将应用 ID、应用密钥和租户 ID 传递到该连接器。 建议使用 Azure 密钥保管库来存储这些值,以确保它们不会以明文形式存储在笔记本文件中。

选项 说明 模式和示例用法
appId 用于向存储帐户进行身份验证的应用注册 ID <guid>
appKey 已注册的应用密钥或机密 <encrypted secret>
tenantId 用于注册应用的 Microsoft Entra 租户 ID <guid>

示例

以下示例都使用了 appIdappKeytenantId 变量。 基于 Azure 应用注册,你在代码的前面初始化了这些变量:存储 Blob 数据参与者对存储的写入权限,以及存储 Blob 数据读者的读取权限。

读取

此代码从包含 mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json 中清单的 Common Data Model 文件夹读取 Person 实体:

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

仅使用 DataFrame 架构进行隐式写入

以下代码将 df DataFrame 写入 Common Data Model 文件夹,其中包含带有事件实体的 mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json 的清单。

该代码将事件数据写入作为 Parquet 文件写入,使用 gzip压缩它,并将其追加到文件夹。 (代码会添加新文件而不删除现有文件。)


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

使用存储在 Data Lake Storage 中的实体定义显式写入

以下代码将 df DataFrame 写入 Common Data Model 文件夹,其中包含一个带有 Person 实体的位于 https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json 的清单。 默认情况下,代码将人员数据作为新的 CSV 文件写入,该新文件会覆盖文件夹中的现有文件。

代码会从 https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json 中检索 Person 实体定义。

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

使用 Common Data Model GitHub 存储库中定义的实体显式写入

以下代码会将 df DataFrame 写入 Common Data Model 文件夹,其中包含:

  • 位于 https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json 的清单。
  • 包含在 TeamMembership 子目录中创建的 TeamMembership 实体的子清单。

TeamMembership 数据将写入到 CSV 文件(默认设置),这些 CSV 文件会覆盖任何现有数据文件。 代码会从位于 https://cdm-schema.microsoft.com/logical/core/applicationCommon/TeamMembership.cdm.json 的 Common Data Model CDN 中检索 TeamMembership 实体定义。

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

其他注意事项

将数据类型从 Spark 映射到 Common Data Model

在将 Common Data Model 转换为 Spark 或从后者转换为前者时,连接器会应用以下数据类型映射。

Spark 常见数据模型
ShortType SmallInteger
IntegerType Integer
LongType BigInteger
DateType Date
Timestamp DateTime(可以选择 Time
StringType String
DoubleType Double
DecimalType(x,y) Decimal (x,y)(默认小数位数和精准率为 18,4
FloatType Float
BooleanType Boolean
ByteType Byte

连接器不支持 Common Data Model Binary 数据类型。

处理 Common Data Model Date、DateTime 和 DateTimeOffset 数据

Spark CDM 连接器像平常处理 Spark 和 Parquet 一样处理 Common Data Model DateDateTime 数据类型。 在 CSV 中,连接器以 ISO 8601 格式读取和写入这些数据类型。

连接器会将 Common Data Model DateTime 数据类型值解释为 UTC。 在 CSV 中,连接器以 ISO 8601 格式写入这些值。 例如 2020-03-13 09:49:00Z

在 Spark 和 Parquet 中,用于记录本地时间瞬间的 Common Data Model DateTimeOffset 值的处理方式与 CSV 不同。 CSV 和其他格式可以将本地时间瞬间表示为包含日期/时间的结构,例如 2020-03-13 09:49:00-08:00。 Parquet 和 Spark 则不支持此类结构。 相反,它们使用的是 TIMESTAMP 数据类型,允许以 UTC(或在某个未指定的时区中)形式记录瞬间。

Spark CDM 连接器会将 CSV 中 DateTimeOffset 值转换为 UTC 时间戳。 此值将在 Parquet 中保留为时间戳。 如果将该值稍后保存到 CSV 中,它将序列化为偏移量为 +00:00 的 DateTimeOffset 值。 时间准确度将不会受到任何损失。 尽管偏移量丢失,但序列化值可表示与原始值相同的瞬间。

Spark 系统使用其系统时间作为基线,并且通常使用本地时间来表示时间。 UTC 时间则始终可以通过应用本地系统偏移量来计算得出。 对于所有区域的 Azure 系统,系统时间始终为 UTC,因此所有时间戳值通常都采用 UTC 格式。 使用 Common Data Model 定义派生自 DataFrame 的隐式写入时,时间戳列将转换为具有 Common Data Model DateTime 数据类型的属性,这就意味着 UTC 时间。

如果保留本地时间很重要,并且数据将在 Spark 中处理或以 Parquet 格式保留,则建议使用 DateTime 属性,并在单独的属性中保留偏移量。 例如,可以将偏移量保留为表示分钟的带符号整数值。 在 Common Data Model 中,DateTime 值将采用 UTC 格式,因此必须应用偏移量来计算本地时间。

在大多数情况下,保留本地时间并不重要。 通常仅在方便用户使用时,UI 才需要使用本地时间,并且本地时间将基于用户所在的时区,因此不存储 UTC 时间通常是更好的解决方案。

处理 Common Data Model 时间数据

Spark 不支持显式 Time 数据类型。 具有 Common Data Model Time 数据类型的属性在 Spark DataFrame 中表示为带有 Timestamp 数据类型的列。 当 Spark CDM 连接器读取时间值时,DataFrame 中的时间戳将初始化为 Spark 纪元日期 01/01/1970 加上从源读取的时间值。

使用显式写入时,可以将时间戳列映射到 DateTimeTime 属性。 如果将时间戳映射到 Time 属性,则会去除时间戳的日期部分。

使用隐式写入时,时间戳列会默认映射到 DateTime 属性。 要将时间戳列映射到 Time 属性,必须将元数据对象添加到 DataFrame 中的列,以指示应将时间戳解释为时间值。 以下代码演示了如何在 Scala 中执行此操作:

val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))

时间值准确度

Spark CDM 连接器支持 DateTimeTime 格式的时间值。 根据要读取的文件(CSV 或 Parquet)中的数据格式,或 DataFrame 中定义的数据格式,秒最多可保留六个小数位数。 使用六个小数位数可实现从单秒到微秒的准确度。

命名和组织文件夹和文件

写入 Common Data Model 文件夹时,可使用默认的文件夹组织方式。 默认情况下,日期文件会写入到为当前日期创建的文件夹中,命名方式如 2010-07-31。 可以使用 dateFolderFormat 选项自定义文件夹结构和名称。

数据文件名称基于以下模式:<entity>-<jobid>-*.<fileformat>。

可以使用 sparkContext.parallelize() 方法控制要写入的数据分区数。 分区数由 Spark 群集中的执行程序数确定,或显示指定。 以下 Scala 示例创建了具有两个分区的 DataFrame:

val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)

下面是由引用的实体定义所定义的显示写入示例:

+-- <CDMFolder>
     |-- default.manifest.cdm.json     << with entity reference and partition info
     +-- <Entity>
          |-- <entity>.cdm.json        << resolved physical entity definition
          |-- <data folder>
          |-- <data folder>
          +-- ...                            

下面是带有子清单的显示写入示例:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
         |-- <entity>.cdm.json
         |-- <entity>.manifest.cdm.json << submanifest with partition info
         |-- <data folder>
         |-- <data folder>
         +-- ...

下面是隐式写入示例,其中实体定义派生自 DataFrame 架构:

+-- <CDMFolder>
    |-- default.manifest.cdm.json
    +-- <Entity>
         |-- <entity>.cdm.json          << resolved physical entity definition
         +-- LogicalDefinition
         |   +-- <entity>.cdm.json      << logical entity definitions
         |-- <data folder>
         |-- <data folder>
         +-- ...

下面是带有子清单的隐式写入示例:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
        |-- <entity>.cdm.json           << resolved physical entity definition
        |-- <entity>.manifest.cdm.json  << submanifest with reference to the entity and partition info
        +-- LogicalDefinition
        |   +-- <entity>.cdm.json       << logical entity definitions
        |-- <data folder>
        |-- <data folder>
        +-- ...

故障排除和已知问题

  • 确保 DataFrame 中所使用的的十进制数据类型的十进制精度和小数位数与 Common Data Model 实体定义中的数据类型相匹配。 如果 Common Data Model 中未显示定义精准率和小数位数,则默认为 Decimal(18,4)。 对于 model.json 文件,Decimal 会假定为 Decimal(18,4)
  • 以下选项中的文件夹和文件名不应包含空格或特殊字符,例如等号 (=):manifestPathentityDefinitionModelRootentityDefinitionPathdataFolderFormat

后续步骤

现在可以查看其他 Apache Spark 连接器: