Compartir a través de


Conector de Common Data Model de Spark para Azure Synapse Analytics

El conector de Common Data Model de Spark (conector CDM de Spark) es un lector o escritor de formato en Azure Synapse Analytics. Permite a un programa de Spark leer y escribir entidades de Common Data Model en una carpeta Common Data Model a través de DataFrames de Spark.

Para obtener información sobre cómo definir documentos de Common Data Model mediante Common Data Model 1.2, consulte este artículo sobre qué es Common Data Model y cómo usarlo.

Funcionalidades

En un nivel alto, el conector admite:

  • 3.1 y 3.2., y 3.3.
  • Lectura de datos de una entidad de una carpeta de Common Data Model en un DataFrame de Spark.
  • Escritura desde un DataFrame de Spark en una entidad de una carpeta Common Data Model basada en una definición de entidad de Common Data Model.
  • Escritura desde un DataFrame de Spark en una entidad de una carpeta de Common Data Model basada en el esquema del DataFrame.

El conector también admite:

  • Lectura y escritura en carpetas de Common Data Model en Azure Data Lake Storage con un espacio de nombres jerárquico (HNS) habilitado.
  • Admite la lectura desde carpetas de Common Data Model descritas por archivos de manifiesto o model.json.
  • Escribir en carpetas de Common Data Model descritas por un archivo de manifiesto.
  • Datos en formato CSV con o sin encabezados de columna, y con caracteres delimitadores seleccionables por el usuario.
  • Datos en formato Apache Parquet, incluido Parquet anidado.
  • Submanifiestos en lectura, y el uso opcional de submanifiestos con ámbito de entidad en la escritura.
  • Escribir datos a través de patrones de partición modificables por el usuario.
  • Uso de identidades administradas y credenciales en Azure Synapse Analytics.
  • Resolución de las ubicaciones de alias de Common Data Model usadas en las importaciones a través de definiciones del adaptador de Common Data Model descritas en un archivo config.json.

Limitaciones

El conector no admite las siguientes funcionalidades y escenarios:

  • Escrituras paralelas. No las recomendamos. No hay ningún mecanismo de bloqueo en la capa de almacenamiento.
  • Acceso mediante programación a los metadatos de entidad después de leer una entidad.
  • Acceso mediante programación para establecer o reemplazar los metadatos al escribir una entidad.
  • Desfase de esquema, donde los datos de un DataFrame que se escriben incluyen atributos adicionales no incluidos en la definición de la entidad.
  • Evolución del esquema, donde las particiones de la entidad hacen referencia a distintas versiones de la definición de la entidad. Para verificarlo, ejecute com.microsoft.cdm.BuildInfo.version.
  • Compatibilidad de escritura para model.json.
  • Escribir Time datos en Parquet. Actualmente, el conector admite la invalidación de una columna de marca de tiempo para que se interprete como un valor de Common Data Model Time en lugar de un DateTime valor solo para archivos CSV.
  • El tipo Parquet Map, matrices de tipos primitivos y matrices de tipos de matriz. Common Data Model no las admite actualmente, por lo que tampoco el conector CDM de Spark.

Ejemplos

Para empezar a usar el conector, consulte el código de ejemplo y los archivos de Common Data Model.

Lectura de datos

Cuando el conector lee datos, usa metadatos en la carpeta de Common Data Model para crear el DataFrame en función de la definición de la entidad resuelta para la entidad especificada, como se hace referencia en el manifiesto. El conector usa nombres de atributo de entidad como nombres de columna DataFrame. Asigna tipos de datos de atributo a tipos de datos de columna. Cuando se carga el DataFrame, se rellena a partir de las particiones de entidad identificadas en el manifiesto.

El conector busca la entidad especificada en el manifiesto especificado y en los submanifiestos de primer nivel. Si la entidad necesaria está en un submanifiesto de segundo nivel o inferior, o si hay varias entidades del mismo nombre en submanifiestos diferentes, debe especificar el submanifiesto que contiene la entidad necesaria en lugar del manifiesto raíz.

Las particiones de entidad pueden estar en una combinación de formatos (por ejemplo, CSV y Parquet). Todos los archivos de datos de entidad identificados en el manifiesto se combinan en un conjunto de datos, independientemente del formato, y se cargan en el DataFrame.

Cuando el conector lee datos CSV, usa la opción Spark failfast de forma predeterminada. Si el número de columnas no es igual al número de atributos de la entidad, el conector devolverá un error.

Como alternativa, a partir de 0.19, el conector admite el modo permisivo (solo para archivos CSV). Con el modo permisivo, cuando una fila CSV tiene un número menor de columnas que el esquema de la entidad, el conector asigna valores null en las columnas que faltan. Cuando una fila CSV tiene más columnas que el esquema de la entidad, las columnas mayores que el recuento de columnas del esquema de la entidad se truncan en el recuento de columnas del esquema. Se usa de esta manera:

.option("mode", "permissive") or .option("mode", "failfast")

Escritura de datos

Cuando el conector escribe en una carpeta Common Data Model, si la entidad aún no existe en esa carpeta, el conector crea una nueva entidad y definición. Agrega la entidad y la definición a la carpeta Common Data Model y las hace referencia en el manifiesto.

El conector admite dos modos de escritura:

  • Escritura explícita: la definición de entidad física se basa en una definición de entidad lógica de Common Data Model que especifique.

    El conector lee y resuelve la definición de entidad lógica especificada para crear la definición de entidad física que se usa en la carpeta Common Data Model. Si las instrucciones de importación en cualquier archivo de definición de Common Data Model al que se hace referencia directa o indirectamente incluyen alias, debe proporcionar un archivo config.json que asigne estos alias a adaptadores y ubicaciones de almacenamiento de Common Data Model.

    • Si el esquema DataFrame no coincide con la definición de entidad a la que se hace referencia, el conector devuelve un error. Asegúrese de que los tipos de datos de columna del DataFrame coincidan con los tipos de datos del atributo de la entidad, incluidos los datos decimales, la precisión y el conjunto de escalado a través de rasgos en Common Data Model.
    • Si el DataFrame es incoherente con la definición de entidad, el conector devuelve un error.
    • Si el DataFrame es coherente:
      • Si la entidad ya existe en el manifiesto, el conector resuelve la definición de entidad proporcionada y la valida con la definición de la carpeta Common Data Model. Si las definiciones no coinciden, el conector devuelve un error. De lo contrario, el conector escribe datos y actualiza la información de partición en el manifiesto.
      • Si la entidad no existe en la carpeta Common Data Model, el conector escribe una copia resuelta de la definición de entidad en el manifiesto en la carpeta Common Data Model. El conector escribe datos y actualiza la información de partición en el manifiesto.
  • Escritura implícita: la definición de la entidad se deriva de la estructura del DataFrame.

    • Si la entidad no existe en la carpeta Common Data Model, el conector usa la definición implícita para crear la definición de entidad resuelta en la carpeta Common Data Model de destino.

    • Si la entidad existe en la carpeta de Common Data Model, el conector valida la definición implícita frente a la definición de la entidad existente. Si las definiciones no coinciden, el conector devuelve un error. De lo contrario, el conector escribe datos y escribe definiciones de entidad lógica derivadas en una subcarpeta de la carpeta de entidades.

      El conector escribe datos en carpetas de datos dentro de una subcarpeta de entidad. Un modo de guardado determina si los nuevos datos se sobrescriben o se anexan a los datos existentes, o se devuelve un error si existen datos. El comportamiento predeterminado es devolver un error si ya existen datos.

Integración de alias de Common Data Model

Los archivos de definición de Common Data Model usan alias en instrucciones de importación para simplificar la instrucción de importación y permitir que la ubicación del contenido importado esté enlazada en tiempo de ejecución. Usar alias:

  • Permite la organización sencilla de los archivos de Common Data Model para que las definiciones de Common Data Model relacionadas se puedan agrupar en distintas ubicaciones.
  • Permite acceder al contenido de Common Data Model desde diferentes ubicaciones implementadas en tiempo de ejecución.

El siguiente fragmento de código muestra el uso de alias en instrucciones de importación en un archivo de definición de Common Data Model:

"imports": [  
{     
  "corpusPath": "cdm:/foundations.cdm.json"
},  
{       
  "corpusPath": "core:/TrackedEntity.cdm.json"  
},  
{      
  "corpusPath": "Customer.cdm.json"  
} 
]

En el ejemplo anterior se usa cdm como alias para la ubicación del archivo de fundamentos de Common Data Model. Usa core como alias para la ubicación del archivo de definición TrackedEntity.

Los alias son etiquetas de texto que coinciden con un valor de espacio de nombres en una entrada de adaptador en un archivo config.json de Common Data Model. Una entrada de adaptador especifica el tipo de adaptador (por ejemplo, adls, CDN, GitHub, o local) y una dirección URL que determina la ubicación. Algunos adaptadores admiten otras opciones de configuración, como un tiempo de espera de conexión. Mientras que los alias son etiquetas de texto arbitrarias, el alias cdm se trata de una manera especial.

El conector de CDM de Spark busca en la ubicación raíz del modelo de definición de la entidad el archivo config.json que se va a cargar. Si el archivo config.json se encuentra en otra ubicación o el usuario busca reemplazar el archivo config.json en la raíz del modelo, puede proporcionar la ubicación de un archivo config.json mediante la opción configPath. El archivo config.json debe contener entradas de adaptador para todos los alias usados en el código de Common Data Model que se está resolviendo o el conector notifica un error.

La capacidad de invalidar el archivo config.json significa que puede proporcionar ubicaciones accesibles en tiempo de ejecución para las definiciones de Common Data Model. Asegúrese de que el contenido al que se hace referencia en tiempo de ejecución sea coherente con las definiciones usadas cuando se creó originalmente el Common Data Model.

Por convención, el cdm alias hace referencia a la ubicación de las definiciones de Common Data Model estándar de nivel raíz, incluido el archivo foundations.cdm.json. Este archivo incluye los tipos de datos primitivos de Common Data Model y un conjunto básico de definiciones de rasgos necesarias para la mayoría de las definiciones de entidad de Common Data Model.

Puede resolver el alias cdm como cualquier otro alias, mediante una entrada de adaptador en el archivo config.json. Si no especifica un adaptador o proporciona una entrada nula, el alias cdm se resuelve de forma predeterminada en la red de entrega de contenido público (CDN) de Common Data Model en https://cdm-schema.microsoft.com/logical/.

También puede usar la opción cdmSource para invalidar cómo se resuelve el aliascdm. Usar la opción cdmSource es útil si el alias cdm es el único alias usado en las definiciones del Common Data Model que se van a resolver, ya que puede evitar la necesidad de crear o hacer referencia a un archivo config.json.

Parámetros, opciones y modo de guardado

En el caso de las lecturas y escrituras, proporcione el nombre de biblioteca del conector CDM de Spark como parámetro. Use un conjunto de opciones para parametrizar el comportamiento del conector. Al escribir, el conector también admite un modo de guardado.

El nombre de la biblioteca del conector, las opciones y el modo de guardado tienen el formato siguiente:

  • dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
  • dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)

Este es un ejemplo que muestra algunas de las opciones de uso del conector para lecturas:

val readDf = spark.read.format("com.microsoft.cdm")
  .option("storage", "mystorageaccount.dfs.core.windows.net")
  .option("manifestPath", "customerleads/default.manifest.cdm.json")
  .option("entity", "Customer")
  .load()

Opciones comunes de lectura y escritura

Las siguientes opciones identifican la entidad en la carpeta Common Data Model en la que está leyendo o escribiendo.

Opción Descripción Patrón y ejemplo de uso
storage Dirección URL del punto de conexión de la cuenta de Azure Data Lake Storage, con HNS habilitado, que contiene la carpeta Common Data Model.
Use la dirección URL dfs.core.windows.net.
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net"
manifestPath Ruta de acceso relativa al archivo de manifiesto o model.json de la cuenta de almacenamiento. Para la lectura, puede ser un manifiesto raíz, un submanifiesto o un archivo model.json. Para la escritura, debe ser un manifiesto raíz. <container>/{<folderPath>}<manifestFileName>,
"mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json"
"models/hr/employees/model.json" (solo lectura)
entity Nombre de la entidad de origen o de destino en el manifiesto. Al escribir una entidad por primera vez en una carpeta, el conector le da este nombre a la definición de la entidad resuelta. El nombre de la entidad distingue mayúsculas de minúsculas. <entityName>
"customer"
maxCDMThreads Número máximo de lecturas simultáneas mientras el conector resuelve la definición de una entidad. Cualquier entero válido, como 5

Nota

Ya no es necesario especificar una definición de entidad lógica además de la definición de entidad física en la carpeta de Common Data Model durante la lectura.

Opciones de escritura explícitas

Las siguientes opciones identifican la definición de la entidad lógica para la entidad que se va a escribir. La definición de la entidad lógica se resuelve en una definición física que define cómo se escribe la entidad.

Opción Descripción Patrón o ejemplo de uso
entityDefinitionStorage La cuenta Azure Data Lake Storage que contiene la definición de entidad. Es necesario si es diferente de la cuenta de almacenamiento que hospeda la carpeta Common Data Model. <accountName>.dfs.core.windows.net
"myAccount.dfs.core.windows.net"
entityDefinitionModelRoot Ubicación de la raíz o corpus del modelo dentro de la cuenta. <container>/<folderPath>
"crm/core"
entityDefinitionPath La ubicación de la entidad. Es la ruta de acceso del archivo de definición de Common Data Model relativa a la raíz del modelo, incluido el nombre de la entidad en ese archivo. <folderPath>/<entityName>.cdm.json/<entityName>
"sales/customer.cdm.json/customer"
configPath El contenedor y la ruta de acceso de la carpeta a un archivo config.json que contiene las configuraciones del adaptador para todos los alias incluidos en el archivo de definiciones de entidades y cualquier archivo del Common Data Model al que se haga referencia directa o indirectamente.

Esta opción no es obligatoria si config.json está en la carpeta raíz del modelo.
<container><folderPath>
useCdmStandardModelRoot Indica que la raíz del modelo se encuentra en https://cdm-schema.microsoft.com/CDM/logical/. Se usa para hacer referencia a los tipos de entidad definidos en la red CDN del Common Data Model. Invalida entityDefinitionStorage y entityDefinitionModelRoot (si se especifica).
"useCdmStandardModelRoot"
cdmSource Define cómo se resuelve el cdm alias (si está presente en los archivos de definición de Common Data Model). Si usa esta opción, reemplaza cualquier adaptador de cdmespecificado en el archivo config.json. Los valores son builtin o referenced. El valor predeterminado es referenced.

Si establece esta opción en referenced, el conector usa las definiciones de Common Data Model estándar más recientes publicadas en https://cdm-schema.microsoft.com/logical/. Si establece esta opción en builtin, el conector usa las definiciones base de Common Data Model integradas en el modelo de objetos de Common Data Model que usa el conector.

Nota:
* Es posible que el conector de Spark de CDM no use el SDK de Common Data Model más reciente, por lo que es posible que no contenga las definiciones estándar publicadas más recientes.
* Las definiciones integradas incluyen solo el contenido de Common Data Model de nivel superior, como foundations.cdm.json o primitives.cdm.json. Si desea usar definiciones de Common Data Model estándar de nivel inferior, use referenced o incluya un adaptador cdm en config.json.
"builtin"|"referenced"

En el ejemplo anterior, la ruta de acceso completa al objeto de definición de entidad de cliente es https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customer. En esa ruta de acceso, los modelos son el contenedor de Azure Data Lake Storage.

Opciones de escritura implícitas

Si no especifica una definición de entidad lógica durante la escritura, la entidad se escribirá implícitamente, en función del esquema del DataFrame.

Al escribir implícitamente, una columna de marca de tiempo normalmente se interpreta como un tipo de datos DateTime de Common Data Model. Puede invalidar esta interpretación para crear un atributo del tipo de datos Time del Common Data Model proporcionando un objeto de metadatos asociado a la columna que especifica el tipo de datos. Para obtener más información, consulte Control de los datos de tiempo de Common Data Model más adelante en este artículo.

La compatibilidad con los datos de tiempo de escritura solo existe para archivos CSV. Esa compatibilidad actualmente no se extiende a Parquet.

Estructura de carpetas y opciones de formato de datos

Puede usar las siguientes opciones para cambiar el formato de archivo y la organización de carpetas.

Opción Descripción Patrón o ejemplo de uso
useSubManifest Si es true, hace que la entidad de destino se incluya en el manifiesto raíz a través de un submanifiesto. El submanifiesto y la definición de la entidad se escriben en una carpeta de entidades debajo de la raíz. El valor predeterminado es false. "true"|"false"
format Define el formato de archivo. Los formatos de archivo compatibles en la actualidad son CSV y Parquet. El valor predeterminado es csv. "csv"|"parquet"
delimiter Solo CSV. Define el delimitador que está usando. El valor predeterminado es la coma. "|"
columnHeaders Solo CSV. Si es true, agregará una primera fila a los archivos de datos con encabezados de columna. El valor predeterminado es true. "true"|"false"
compression Solo escritura. Solo Parquet. Define el formato de compresión que está usando. El valor predeterminado es snappy. "uncompressed" | "snappy" | "gzip" | "lzo"
dataFolderFormat Permite una estructura de carpetas de datos definibles por el usuario dentro de una carpeta de entidades. Permite sustituir los valores de fecha y hora en nombres de carpeta mediante el formato DateTimeFormatter. El contenido que no pertenece al formateador se debe incluir entre comillas simples. El formato predeterminado es "yyyy"-"MM"-"dd", que genera nombres de carpeta como 2020-07-30. year "yyyy" / month "MM"
"Data"

Modo de guardado

El modo de guardado especifica cómo el conector controla los datos de entidad existentes en la carpeta Common Data Model al escribir un DataFrame. Las opciones son sobrescribir, anexar o devolver un error, si los datos ya existen. El modo de guardado predeterminado es ErrorIfExists.

Modo Descripción
SaveMode.Overwrite Sobrescribe la definición existente de la entidad si se ha cambiado, y reemplazará las particiones de datos existentes por las particiones de datos que se van a escribir.
SaveMode.Append Anexa los datos que se escriben en nuevas particiones junto con las particiones existentes.

Este modo no admite el cambio del esquema. Si el esquema de los datos que se escriben no es compatible con la definición de entidad existente, el conector produce un error.
SaveMode.ErrorIfExists Devuelve un error si ya existen particiones.

Para obtener más información sobre cómo se denominan y organizan los archivos de datos en escritura, consulte la sección Nomenclatura de archivos y carpetas y organización más adelante en este artículo.

Authentication

Puede usar tres modos de autenticación con el conector de CDM de Spark para leer o escribir los metadatos de Common Data Model y las particiones de datos: paso a través de credenciales, token de firma de acceso compartido (SAS) y registro de aplicaciones.

Acceso directo a credenciales

En Azure Synapse Analytics, el conector de CDM de Spark admite el uso de identidades administradas para recursos de Azure para mediar el acceso a la cuenta de Azure Data Lake Storage que contiene la carpeta de Common Data Model. Se crea automáticamente una identidad administrada para cada área de trabajo de Azure Synapse Analytics. El conector usa la identidad administrada del área de trabajo que contiene el cuaderno en el que se llama al conector para autenticarse en las cuentas de almacenamiento.

Debe asegurarse de que la identidad elegida tenga acceso a las cuentas de almacenamiento adecuadas:

  • Conceda permisos de Colaborador de datos de blob de almacenamiento para permitir que la biblioteca escriba en carpetas de Common Data Model.
  • Conceda permisos de Lector de datos de blob de almacenamiento para permitir solo el acceso de lectura.

En ambos casos, no se requieren opciones de conector adicionales.

Opciones para el control de acceso basado en tokens de SAS

Las credenciales de token de SAS son una opción adicional para la autenticación en cuentas de almacenamiento. Con la autenticación mediante token de SAS, este puede estar en el nivel de contenedor o carpeta. Se requieren los permisos adecuados:

  • Los permisos de lectura para un manifiesto o partición solo necesitan compatibilidad de nivel de lectura.
  • Los permisos de escritura necesitan compatibilidad con lectura y escritura.
Opción Descripción Patrón y ejemplo de uso
sasToken Token de SAS para acceder a la cuenta de almacenamiento relativa con los permisos correctos <token>

Opciones de control de acceso basado en credenciales

Como alternativa al uso de una identidad administrada o una identidad de usuario, puede proporcionar credenciales explícitas para permitir que el conector de CDM para Spark acceda a los datos. En Microsoft Entra ID cree un registro de la aplicación. A continuación, conceda a esta aplicación acceso a la cuenta de almacenamiento mediante cualquiera de los roles siguientes:

  • Colaborador de datos de blob de almacenamiento para permitir que la biblioteca escriba en carpetas de Common Data Model
  • Lector de datos de blob de almacenamiento para permitir solo permisos de lectura

Después de crear los permisos, puede pasar el identificador de la aplicación, la clave de la aplicación y el identificador del inquilino al conector en cada llamada a él mediante las opciones siguientes. Le recomendamos usar Azure Key Vault para almacenar estos valores para asegurarse de que no se almacenen en texto sin formato en el archivo del cuaderno.

Opción Descripción Patrón y ejemplo de uso
appId El identificador del registro de aplicación para la autenticación en la cuenta de almacenamiento <guid>
appKey La clave o el secreto de la aplicación registrados. <encrypted secret>
tenantId El identificador de inquilino de Microsoft Entra con el que se registra la aplicación <guid>

Ejemplos

En los ejemplos siguientes se usan las variables appId, appKey y tenantId. Inicializó esas variables anteriormente en el código, en función de un registro de aplicación de Azure: al que se le han concedido permisos de Colaborador de datos de blob de almacenamiento en el almacenamiento para la escritura, y los permisos de Lector de datos de blob de almacenamiento para la lectura.

Leer

Este código lee la entidad Person de la carpeta de Common Data Model con un manifiesto en mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json:

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

Escritura implícita solo mediante un esquema DataFrame

El código siguiente escribe el DataFrame df en una carpeta de Common Data Model con un manifiesto en mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json con una entidad de evento.

El código escribe datos de eventos como archivos Parquet, los comprime con gzipy los anexa a la carpeta. (El código agrega nuevos archivos sin eliminar archivos existentes).


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

Escritura explícita mediante una definición de entidad almacenada en Data Lake Storage

El código siguiente escribe el DataFrame df en una carpeta de Common Data Model con un manifiesto en https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json con la entidad Person. El código escribe datos de persona como nuevos archivos CSV (de forma predeterminada) que sobrescriben los archivos existentes en la carpeta.

El código recupera la definición de entidad Person de https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json.

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

Escritura explícita mediante una entidad definida en el repositorio de GitHub de Common Data Model

El código siguiente escribe el DataFrame df en una carpeta de Common Data Model con:

  • El manifiesto en https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json.
  • Un submanifiesto que contiene la entidad TeamMembership que se crea en un subdirectorio TeamMembership.

Los datos de TeamMembership se escriben en archivos CSV (como valor predeterminado) que sobrescriben los archivos de datos existentes. El código recupera la definición de entidad TeamMembership de la red CDN de Common Data Model en Pertenencia al equipo en applicationCommon.

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

Otras consideraciones

Asignación de tipos de datos de Spark a Common Data Model

El conector aplica las siguientes asignaciones de tipos de datos al convertir Common Data Model a o desde Spark.

Spark Common Data Model
ShortType SmallInteger
IntegerType Integer
LongType BigInteger
DateType Date
Timestamp DateTime (opcionalmente Time)
StringType String
DoubleType Double
DecimalType(x,y) Decimal (x,y) (la escala y precisión predeterminadas son 18,4)
FloatType Float
BooleanType Boolean
ByteType Byte

El conector no admite el tipo de datos de Common Data Model Binary.

Control de datos de Common Data Model de tipo Date, DateTime y DateTimeOffset

El conector de CDM de Spark controla los tipos de datos Date y DateTime de Common Data Model como normales para Spark y Parquet. En CSV, el conector lee y escribe esos tipos de datos en formato ISO 8601.

El conector interpreta los valores de tipo de datos DateTime de Common Data Model como UTC. En CSV, el conector escribe esos valores en formato ISO 8601. Un ejemplo es 2020-03-13 09:49:00Z.

Los valores DateTimeOffset de Common Data Model diseñados para registrar instantáneas de hora local se controlan de forma diferente en Spark y Parquet que en CSV. CSV y otros formatos pueden expresar un instante de hora local como una estructura que consta de una fecha y hora, como 2020-03-13 09:49:00-08:00. Parquet y Spark no admiten estas estructuras. En su lugar, usan un tipo de datos TIMESTAMP que permite registrar un instante en hora UTC (o en una zona horaria no especificada).

El conector CDM de Spark convierte un valor DateTimeOffset en CSV a la marca de tiempo en hora UTC. Este valor se conserva como una marca de tiempo en Parquet. Si el valor se conserva posteriormente en CSV, se serializará como un valor DateTimeOffset con un desplazamiento de +00:00. No se pierde la precisión temporal. Los valores serializados representan el mismo instante que los valores originales, aunque se pierda la diferencia horaria.

Los sistemas de Spark usan la hora del sistema como base de referencia y normalmente expresan la hora usando la hora local. Las horas UTC siempre se pueden calcular a través de la aplicación de la diferencia horaria del sistema local. Para los sistemas de Azure de todas las regiones, la hora del sistema siempre es UTC, por lo que todos los valores de marca de tiempo están normalmente en UTC. Cuando se usa una escritura implícita, donde una definición de Common Data Model se deriva de un DataFrame, las columnas de marca de tiempo se traducen a atributos con el tipo de datos DateTime de Common Data Model, lo que implica una hora UTC.

Si es importante conservar una hora local y los datos se procesarán en Spark o se conservarán en Parquet, le recomendamos usar un atributo DateTime y mantener la diferencia horaria en un atributo independiente. Por ejemplo, puede mantener la diferencia como un valor entero con signo que representa minutos. En Common Data Model, los valores DateTime están en UTC, por lo que debe aplicar la diferencia a la hora local de proceso.

En la mayoría de los casos, no es importante conservar la hora local. A menudo, las horas locales solo son necesarias en una interfaz de usuario para mayor comodidad del usuario y en función de la zona horaria del usuario, por lo que no almacenar una hora UTC suele ser una mejor solución.

Control de los datos de hora de Common Data Model

Spark no admite un tipo de datos Time explícito. Un atributo con el tipo de datos Time de Common Data Model se representa en un DataFrame de Spark como una columna con un tipo de datos Timestamp. Cuando el conector de CDM de Spark lee un valor de hora, la marca de tiempo del DataFrame se inicializa con la fecha de la época de Spark 01/01/1970 más el valor de hora como leído del origen.

Al usar escritura explícita, puede asignar una columna de marca de tiempo a un DateTime o atributo Time. Si asigna una marca de tiempo a un atributo Time, se quita la parte de fecha de la marca de tiempo.

Cuando se usa la escritura implícita, una columna de marca de tiempo se asigna de manera predeterminada a un atributo DateTime. Para asignar una columna de marca de tiempo a un atributo Time, debe agregar un objeto de metadatos a la columna en el DataFrame, que indique que la marca de tiempo debe interpretarse como un valor de tiempo. El código siguiente muestra cómo hacerlo en Scala:

val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))

Precisión del valor de tiempo

El conector de CDM de Spark admite valores de hora en DateTime o Time. Los segundos tienen hasta seis posiciones decimales, en función del formato de los datos del archivo que se va a leer (CSV o Parquet) o tal como se define en el DataFrame. El uso de seis posiciones decimales permite la precisión de unos segundos a microsegundos.

Nomenclatura y organización de carpetas y archivos

Al escribir en carpetas de Common Data Model, hay una organización de carpetas predeterminada. De manera predeterminada, los archivos de datos se escriben en carpetas creadas para la fecha actual, denominadas como 2010-07-31. Puede personalizar la estructura de carpetas y los nombres mediante la opción dateFolderFormat.

Los nombres de los archivos de datos se basan en el siguiente patrón: <entidad>-<iddetrabajo>-*.<formatodearchivo>.

Puede controlar el número de particiones de datos escritas mediante el método sparkContext.parallelize(). El número de particiones viene determinado por el número de ejecutores del clúster de Spark, o está especificado explícitamente. En el ejemplo de Scala siguiente se crea un DataFrame con dos particiones:

val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)

Este es un ejemplo de una escritura explícita definida por una definición de entidad a la que se hace referencia:

+-- <CDMFolder>
     |-- default.manifest.cdm.json     << with entity reference and partition info
     +-- <Entity>
          |-- <entity>.cdm.json        << resolved physical entity definition
          |-- <data folder>
          |-- <data folder>
          +-- ...                            

Este es un ejemplo de una escritura explícita con un submanifiesto:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
         |-- <entity>.cdm.json
         |-- <entity>.manifest.cdm.json << submanifest with partition info
         |-- <data folder>
         |-- <data folder>
         +-- ...

Este es un ejemplo de una escritura implícita en la que la definición de entidad se deriva de un esquema DataFrame:

+-- <CDMFolder>
    |-- default.manifest.cdm.json
    +-- <Entity>
         |-- <entity>.cdm.json          << resolved physical entity definition
         +-- LogicalDefinition
         |   +-- <entity>.cdm.json      << logical entity definitions
         |-- <data folder>
         |-- <data folder>
         +-- ...

Este es un ejemplo de una escritura implícita con un submanifiesto:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
        |-- <entity>.cdm.json           << resolved physical entity definition
        |-- <entity>.manifest.cdm.json  << submanifest with reference to the entity and partition info
        +-- LogicalDefinition
        |   +-- <entity>.cdm.json       << logical entity definitions
        |-- <data folder>
        |-- <data folder>
        +-- ...

Solución de problemas conocidos

  • Asegúrese de que la precisión decimal y la escala de los campos de tipo de datos decimales que use en DataFrame coincidan con el tipo de datos que se encuentra en la definición de entidad de Common Data Model. Si la precisión y la escala no se definen explícitamente en Common Data Model, el valor predeterminado que se usa es Decimal(18,4). En el caso de los archivos model.json, se supone que Decimal es Decimal(18,4).
  • Los nombres de carpeta y archivo de las siguientes opciones no deben incluir espacios ni caracteres especiales, como un signo igual (=): manifestPath, entityDefinitionModelRoot, entityDefinitionPath, dataFolderFormat.

Pasos siguientes

Ahora puede ver los demás conectores de Apache Spark: