Share via


Spark Common Data Model-connector voor Azure Synapse Analytics

De Spark Common Data Model-connector (Spark CDM-connector) is een indelingslezer/schrijver in Azure Synapse Analytics. Hiermee kan een Spark-programma Common Data Model-entiteiten lezen en schrijven in een map Common Data Model via Spark DataFrames.

Voor informatie over het definiëren van Common Data Model-documenten met Common Data Model 1.2 raadpleegt u dit artikel over wat Common Data Model is en hoe u dit kunt gebruiken.

Functies

Op hoog niveau ondersteunt de connector:

  • 3.1 en 3.2., en 3.3.
  • Gegevens lezen uit een entiteit in een map Common Data Model in een Spark DataFrame.
  • Schrijven van een Spark DataFrame naar een entiteit in een Common Data Model-map op basis van een Common Data Model-entiteitsdefinitie.
  • Schrijven vanuit een Spark DataFrame naar een entiteit in een Common Data Model-map op basis van het DataFrame-schema.

De connector ondersteunt ook:

  • Lezen en schrijven naar Common Data Model-mappen in Azure Data Lake Storage met een hiërarchische naamruimte (HNS) ingeschakeld.
  • Lezen uit Common Data Model-mappen die worden beschreven door manifestbestanden of model.json bestanden.
  • Schrijven naar Common Data Model-mappen die worden beschreven door een manifestbestand.
  • Gegevens in CSV-indeling met of zonder kolomkoppen en met door de gebruiker te selecteren scheidingstekens.
  • Gegevens in Apache Parquet-indeling, inclusief geneste Parquet.
  • Submanifesten voor lezen en optioneel gebruik van submanifesten met entiteitsbereik op schrijfbewerkingen.
  • Gegevens schrijven via door de gebruiker wijzigbare partitiepatronen.
  • Gebruik van beheerde identiteiten en referenties in Azure Synapse Analytics.
  • Het oplossen van aliaslocaties van Common Data Model die worden gebruikt in import via Common Data Model-adapterdefinities die worden beschreven in een config.json-bestand .

Beperkingen

De connector biedt geen ondersteuning voor de volgende mogelijkheden en scenario's:

  • Parallelle schrijfbewerkingen. We raden ze niet aan. Er is geen vergrendelingsmechanisme op de opslaglaag.
  • Programmatische toegang tot entiteitsmetagegevens nadat u een entiteit hebt gelezen.
  • Programmatische toegang om metagegevens in te stellen of te overschrijven wanneer u een entiteit schrijft.
  • Schemadrift, waarbij gegevens in een DataFrame die worden geschreven extra kenmerken bevatten die niet zijn opgenomen in de entiteitsdefinitie.
  • Schemaontwikkeling, waarbij entiteitspartities verwijzen naar verschillende versies van de entiteitsdefinitie. U kunt de versie controleren door deze uit te voeren com.microsoft.cdm.BuildInfo.version.
  • Schrijf ondersteuning voor model.json.
  • Gegevens schrijven Time naar Parquet. Op dit moment ondersteunt de connector het overschrijven van een tijdstempelkolom die moet worden geïnterpreteerd als een Common Data Model-waarde Time in plaats van alleen een DateTime waarde voor CSV-bestanden.
  • Het Parquet-type Map , matrices van primitieve typen en matrices van matrixtypen. Common Data Model ondersteunt deze momenteel niet, dus ook de Spark CDM-connector.

Voorbeelden

Als u de connector wilt gaan gebruiken, bekijkt u de voorbeeldcode en Common Data Model-bestanden.

Lezen van de gegevens

Wanneer de connector gegevens leest, worden metagegevens gebruikt in de map Common Data Model om het DataFrame te maken op basis van de definitie van de opgeloste entiteit voor de opgegeven entiteit, zoals wordt verwezen in het manifest. De connector maakt gebruik van entiteitskenmerknamen als DataFrame-kolomnamen. Het wijst kenmerkgegevenstypen toe aan kolomgegevenstypen. Wanneer het DataFrame wordt geladen, wordt het gevuld met de entiteitspartities die zijn geïdentificeerd in het manifest.

De connector zoekt in het opgegeven manifest en eventuele submanifesten op het eerste niveau voor de opgegeven entiteit. Als de vereiste entiteit zich in een tweede of lagere submanifest bevindt, of als er meerdere entiteiten met dezelfde naam in verschillende submanifesten staan, moet u het submanifest opgeven dat de vereiste entiteit bevat in plaats van het hoofdmanifest.

Entiteitspartities kunnen een combinatie van indelingen hebben (bijvoorbeeld CSV en Parquet). Alle entiteitsgegevensbestanden die in het manifest worden geïdentificeerd, worden gecombineerd tot één gegevensset, ongeacht de indeling en worden geladen in het DataFrame.

Wanneer de connector CSV-gegevens leest, wordt standaard de Spark-optie failfast gebruikt. Als het aantal kolommen niet gelijk is aan het aantal kenmerken in de entiteit, retourneert de connector een fout.

Vanaf 0.19 ondersteunt de connector ook de permissieve modus (alleen voor CSV-bestanden). Wanneer een CSV-rij een lager aantal kolommen heeft dan het entiteitsschema, wijst de connector null-waarden toe voor de ontbrekende kolommen. Wanneer een CSV-rij meer kolommen heeft dan het entiteitsschema, worden de kolommen groter dan het aantal kolommen van het entiteitsschema afgekapt tot het aantal schemakolommen. Gebruik is als volgt:

.option("mode", "permissive") or .option("mode", "failfast")

Gegevens schrijven

Wanneer de connector naar een Common Data Model-map schrijft en de entiteit nog niet in die map bestaat, maakt de connector een nieuwe entiteit en definitie. Hiermee voegt u de entiteit en definitie toe aan de map Common Data Model en verwijst u ernaar in het manifest.

De connector ondersteunt twee schrijfmodi:

  • Expliciet schrijven: de definitie van de fysieke entiteit is gebaseerd op een logische Common Data Model-entiteitsdefinitie die u opgeeft.

    De connector leest en lost de opgegeven logische entiteitsdefinitie op om de definitie van de fysieke entiteit te maken die wordt gebruikt in de map Common Data Model. Als importinstructies in een direct of indirect verwezen Common Data Model-definitiebestand aliassen bevatten, moet u een config.json-bestand opgeven dat deze aliassen toewijst aan Common Data Model-adapters en opslaglocaties.

    • Als het DataFrame-schema niet overeenkomt met de definitie van de entiteit waarnaar wordt verwezen, retourneert de connector een fout. Zorg ervoor dat de kolomgegevenstypen in het DataFrame overeenkomen met de kenmerkgegevenstypen in de entiteit, inclusief voor decimale gegevens, precisie en schaalset via kenmerken in Common Data Model.
    • Als het DataFrame niet overeenkomt met de entiteitsdefinitie, retourneert de connector een fout.
    • Als het DataFrame consistent is:
      • Als de entiteit al in het manifest bestaat, wordt de opgegeven entiteitsdefinitie door de connector omgezet en gevalideerd op basis van de definitie in de map Common Data Model. Als de definities niet overeenkomen, retourneert de connector een fout. Anders schrijft de connector gegevens en werkt de partitiegegevens in het manifest bij.
      • Als de entiteit niet bestaat in de map Common Data Model, schrijft de connector een opgeloste kopie van de entiteitsdefinitie naar het manifest in de map Common Data Model. De connector schrijft gegevens en werkt de partitiegegevens in het manifest bij.
  • Impliciet schrijven: de entiteitsdefinitie is afgeleid van de DataFrame-structuur.

    • Als de entiteit niet bestaat in de map Common Data Model, gebruikt de connector de impliciete definitie om de definitie van de opgeloste entiteit te maken in de map Common Data Model.

    • Als de entiteit bestaat in de map Common Data Model, valideert de connector de impliciete definitie op basis van de bestaande entiteitsdefinitie. Als de definities niet overeenkomen, retourneert de connector een fout. Anders schrijft de connector gegevens en schrijft deze afgeleide logische entiteitsdefinities naar een submap van de entiteitsmap.

      De connector schrijft gegevens naar gegevensmappen in een entiteitssubmap. Een opslagmodus bepaalt of de nieuwe gegevens worden overschreven of toegevoegd aan bestaande gegevens, of dat er een fout wordt geretourneerd als er gegevens bestaan. De standaardinstelling is om een fout te retourneren als er al gegevens bestaan.

Integratie van Common Data Model-alias

Common Data Model-definitiebestanden maken gebruik van aliassen in importinstructies om de importinstructies te vereenvoudigen en de locatie van de geïmporteerde inhoud tijdens runtime te laten zijn gebonden. Aliassen gebruiken:

  • Vereenvoudigt de organisatie van Common Data Model-bestanden, zodat gerelateerde Common Data Model-definities op verschillende locaties kunnen worden gegroepeerd.
  • Hiermee kan Common Data Model-inhoud tijdens runtime worden geopend vanaf verschillende geïmplementeerde locaties.

Het volgende codefragment toont het gebruik van aliassen in importinstructies in een Common Data Model-definitiebestand:

"imports": [  
{     
  "corpusPath": "cdm:/foundations.cdm.json"
},  
{       
  "corpusPath": "core:/TrackedEntity.cdm.json"  
},  
{      
  "corpusPath": "Customer.cdm.json"  
} 
]

In het voorgaande voorbeeld wordt cdm gebruikgemaakt van een alias voor de locatie van het basisbestand Common Data Model. Het wordt gebruikt core als een alias voor de locatie van het TrackedEntity definitiebestand.

Aliassen zijn tekstlabels die overeenkomen met een naamruimtewaarde in een adaptervermelding in een Common Data Model config.json-bestand . Een adaptervermelding specificeert het adaptertype (bijvoorbeeld adls, CDN, GitHubof local) en een URL die een locatie definieert. Sommige adapters ondersteunen andere configuratieopties, zoals een verbindingstime-out. Terwijl aliassen willekeurige tekstlabels zijn, wordt de cdm alias op een speciale manier behandeld.

De Spark CDM-connector zoekt in de hoofdlocatie van het model van de entiteitsdefinitie om het config.json-bestand te laden. Als het config.json bestand zich op een andere locatie bevindt of als u het config.json bestand in de hoofdmap van het model wilt overschrijven, kunt u de locatie van een config.json bestand opgeven met behulp van de configPath optie. Het bestand config.json moet adaptervermeldingen bevatten voor alle aliassen die worden gebruikt in de Common Data Model-code die wordt opgelost, of de connector meldt een fout.

De mogelijkheid om het config.json-bestand te overschrijven, betekent dat u runtime toegankelijke locaties kunt bieden voor Common Data Model-definities. Zorg ervoor dat de inhoud waarnaar tijdens runtime wordt verwezen, consistent is met de definities die zijn gebruikt toen Common Data Model oorspronkelijk werd geschreven.

Standaard verwijst de cdm alias naar de locatie van de standaard Common Data Model-definities op hoofdniveau, met inbegrip van het foundations.cdm.json-bestand . Dit bestand bevat de primitieve gegevenstypen Common Data Model en een kernset eigenschappendefinities die vereist zijn voor de meest Common Data Model-entiteitsdefinities.

U kunt de cdm alias, net als elke andere alias, oplossen met behulp van een adaptervermelding in het config.json-bestand . Als u geen adapter opgeeft of als u een null-vermelding opgeeft, wordt de cdm alias standaard omgezet in het Common Data Model Public Content Delivery Network (CDN) op https://cdm-schema.microsoft.com/logical/.

U kunt ook de cdmSource optie gebruiken om te overschrijven hoe de cdm alias wordt omgezet. Het gebruik van de cdmSource optie is handig als de cdm alias de enige alias is die wordt gebruikt in de Common Data Model-definities die worden omgezet, omdat het kan voorkomen dat u een config.json-bestand hoeft te maken of ernaar te verwijzen.

Parameters, opties en opslagmodus

Voor zowel lees- als schrijfbewerkingen geeft u de bibliotheeknaam van de Spark CDM-connector op als parameter. U gebruikt een set opties om het gedrag van de connector te parameteriseren. Wanneer u schrijft, ondersteunt de connector ook een opslagmodus.

De naam, opties en opslagmodus van de connectorbibliotheek worden als volgt opgemaakt:

  • dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
  • dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)

Hier volgt een voorbeeld met enkele van de opties voor het gebruik van de connector voor leesbewerkingen:

val readDf = spark.read.format("com.microsoft.cdm")
  .option("storage", "mystorageaccount.dfs.core.windows.net")
  .option("manifestPath", "customerleads/default.manifest.cdm.json")
  .option("entity", "Customer")
  .load()

Algemene opties voor lezen en schrijven

De volgende opties identificeren de entiteit in de map Common Data Model waarnaar u leest of schrijft.

Optie Beschrijving Patroon en voorbeeldgebruik
storage De eindpunt-URL voor het Azure Data Lake Storage-account, waarvoor HNS is ingeschakeld, die de map Common Data Model bevat.
Gebruik de dfs.core.windows.net URL.
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net"
manifestPath Het relatieve pad naar het manifest of model.json bestand in het opslagaccount. Voor leesbewerkingen kan het een hoofdmanifest of een submanifest of een model.json-bestand zijn. Voor schrijfbewerkingen moet het een hoofdmanifest zijn. <container>/{<folderPath>}<manifestFileName>,
"mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json"
"models/hr/employees/model.json" (alleen-lezen)
entity De naam van de bron- of doelentiteit in het manifest. Wanneer u voor het eerst een entiteit in een map schrijft, geeft de connector deze naam aan de omgezette entiteitsdefinitie. De naam van de entiteit is hoofdlettergevoelig. <entityName>
"customer"
maxCDMThreads Het maximum aantal gelijktijdige leesbewerkingen terwijl de connector een entiteitsdefinitie oplost. Elk geldig geheel getal, zoals 5

Notitie

U hoeft niet langer een definitie van een logische entiteit op te geven naast de definitie van de fysieke entiteit in de map Common Data Model bij lezen.

Expliciete schrijfopties

Met de volgende opties wordt de definitie van de logische entiteit geïdentificeerd voor de entiteit die wordt geschreven. De definitie van de logische entiteit wordt omgezet in een fysieke definitie die definieert hoe de entiteit wordt geschreven.

Optie Beschrijving Patroon of voorbeeldgebruik
entityDefinitionStorage Het Azure Data Lake Storage-account dat de entiteitsdefinitie bevat. Vereist als deze verschilt van het opslagaccount dat als host fungeert voor de map Common Data Model. <accountName>.dfs.core.windows.net
"myAccount.dfs.core.windows.net"
entityDefinitionModelRoot De locatie van de modelhoofdmap of -verzameling binnen het account. <container>/<folderPath>
"crm/core"
entityDefinitionPath De locatie van de entiteit. Het is het bestandspad naar het Common Data Model-definitiebestand ten opzichte van de hoofdmap van het model, inclusief de naam van de entiteit in dat bestand. <folderPath>/<entityName>.cdm.json/<entityName>
"sales/customer.cdm.json/customer"
configPath Het container- en mappad naar een config.json-bestand dat de adapterconfiguraties bevat voor alle aliassen die zijn opgenomen in het entiteitsdefinitiebestand en eventuele direct of indirect verwezen Common Data Model-bestanden.

Deze optie is niet vereist als config.json zich in de hoofdmap van het model bevindt.
<container><folderPath>
useCdmStandardModelRoot Geeft aan dat de hoofdmap van het model zich bevindt op https://cdm-schema.microsoft.com/CDM/logical/. Wordt gebruikt om te verwijzen naar entiteitstypen die zijn gedefinieerd in het Common Data Model CDN. Onderdrukkingen entityDefinitionStorage en entityDefinitionModelRoot (indien opgegeven).
"useCdmStandardModelRoot"
cdmSource Definieert hoe de cdm alias (als deze aanwezig is in Common Data Model-definitiebestanden) wordt omgezet. Als u deze optie gebruikt, wordt elke cdm adapter overschreven die is opgegeven in het config.json-bestand . Waarden zijn builtin of referenced. De standaardwaarde is referenced.

Als u deze optie referencedinstelt, gebruikt de connector de meest recente gepubliceerde Common Data Model-definities op https://cdm-schema.microsoft.com/logical/. Als u deze optie builtininstelt, gebruikt de connector de Common Data Model-basisdefinities die zijn ingebouwd in het Common Data Model-objectmodel dat door de connector wordt gebruikt.

Notitie:
* De Spark CDM-connector maakt mogelijk geen gebruik van de nieuwste Common Data Model SDK, dus bevat deze mogelijk niet de meest recente gepubliceerde standaarddefinities.
* De ingebouwde definities bevatten alleen de inhoud van Common Data Model op het hoogste niveau, zoals foundations.cdm.json of primitives.cdm.json. Als u standaard Common Data Model-definities op lager niveau wilt gebruiken, gebruikt referenced of neemt u een cdm adapter op in config.json.
"builtin"|"referenced"

In het voorgaande voorbeeld is https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customerhet volledige pad naar het definitieobject van de klantentiteit. In dat pad zijn modellen de container in Azure Data Lake Storage.

Impliciete schrijfopties

Als u geen definitie van een logische entiteit opgeeft bij schrijven, wordt de entiteit impliciet geschreven op basis van het DataFrame-schema.

Wanneer u impliciet schrijft, wordt een tijdstempelkolom normaal gesproken geïnterpreteerd als een Common Data Model-gegevenstype DateTime . U kunt deze interpretatie overschrijven om een kenmerk van het gegevenstype Common Data Model Time te maken door een metagegevensobject op te geven dat is gekoppeld aan de kolom die het gegevenstype aangeeft. Zie Common Data Model-tijdgegevens verwerken verderop in dit artikel voor meer informatie.

Ondersteuning voor het schrijven van tijdgegevens bestaat alleen voor CSV-bestanden. Deze ondersteuning wordt momenteel niet uitgebreid naar Parquet.

Opties voor mapstructuur en gegevensindeling

U kunt de volgende opties gebruiken om de organisatie en bestandsindeling van mappen te wijzigen.

Optie Beschrijving Patroon of voorbeeldgebruik
useSubManifest Als true, zorgt ervoor dat de doelentiteit wordt opgenomen in het hoofdmanifest via een submanifest. Het submanifest en de entiteitsdefinitie worden geschreven in een entiteitsmap onder de hoofdmap. Standaard is false. "true"|"false"
format Hiermee definieert u de bestandsindeling. De huidige ondersteunde bestandsindelingen zijn CSV en Parquet. Standaard is csv. "csv"|"parquet"
delimiter Alleen CSV. Definieert het scheidingsteken dat u gebruikt. De standaardwaarde is een komma. "|"
columnHeaders Alleen CSV. Als true, voegt u een eerste rij toe aan gegevensbestanden met kolomkoppen. Standaard is true. "true"|"false"
compression Alleen schrijven. Alleen Parquet. Hiermee definieert u de compressie-indeling die u gebruikt. Standaard is snappy. "uncompressed" | "snappy" | "gzip" | "lzo"
dataFolderFormat Hiermee staat u een door de gebruiker gedefinieerde structuur van gegevensmappen binnen een entiteitsmap toe. Hiermee kunt u datum- en tijdwaarden vervangen door mapnamen met behulp van DateTimeFormatter opmaak. Inhoud zonder opmaak moet tussen enkele aanhalingstekens worden geplaatst. Standaardindeling is "yyyy"-"MM"-"dd", die mapnamen produceert zoals 2020-07-30. year "yyyy" / month "MM"
"Data"

Modus voor opslaan

De opslagmodus geeft aan hoe de connector bestaande entiteitsgegevens verwerkt in de map Common Data Model wanneer u een DataFrame schrijft. Opties zijn het overschrijven, toevoegen aan of retourneren van een fout als er al gegevens bestaan. De standaardmodus voor opslaan is ErrorIfExists.

Wijze Beschrijving
SaveMode.Overwrite Overschrijft de bestaande entiteitsdefinitie als deze wordt gewijzigd en vervangt bestaande gegevenspartities door de gegevenspartities die worden geschreven.
SaveMode.Append Voegt gegevens toe die worden geschreven in nieuwe partities naast de bestaande partities.

Deze modus biedt geen ondersteuning voor het wijzigen van het schema. Als het schema van de gegevens die worden geschreven niet compatibel is met de bestaande entiteitsdefinitie, genereert de connector een fout.
SaveMode.ErrorIfExists Retourneert een fout als er al partities bestaan.

Zie de sectie Map- en bestandsnaamgeving en organisatie verderop in dit artikel voor meer informatie over hoe gegevensbestanden worden benoemd en geordend op schrijven.

Verificatie

U kunt drie verificatiemodi gebruiken met de Spark CDM-connector om de metagegevens en gegevenspartities van Common Data Model te lezen of te schrijven: passthrough van referenties, SAS-token (Shared Access Signature) en app-registratie.

Referentie-passthrough

In Azure Synapse Analytics ondersteunt de Spark CDM-connector het gebruik van beheerde identiteiten voor Azure-resources om de toegang tot het Azure Data Lake Storage-account te mediaatiseren dat de map Common Data Model bevat. Er wordt automatisch een beheerde identiteit gemaakt voor elke Azure Synapse Analytics-werkruimte. De connector maakt gebruik van de beheerde identiteit van de werkruimte die het notebook bevat waarin de connector wordt aangeroepen om te verifiëren bij de opslagaccounts.

U moet ervoor zorgen dat de gekozen identiteit toegang heeft tot de juiste opslagaccounts:

  • Verlenen inzender machtigingen voor Opslagblobgegevens zodat de bibliotheek kan schrijven naar mappen van Common Data Model.
  • Verken opslagblobgegevenslezer machtigingen om alleen leestoegang toe te staan.

In beide gevallen zijn er geen extra connectoropties vereist.

Opties voor toegangsbeheer op basis van SAS-tokens

SAS-tokenreferenties zijn een extra optie voor verificatie bij opslagaccounts. Met SAS-tokenverificatie kan het SAS-token zich op container- of mapniveau bevinden. De juiste machtigingen zijn vereist:

  • Leesmachtigingen voor een manifest of partitie hebben alleen ondersteuning op leesniveau nodig.
  • Schrijfmachtigingen hebben zowel lees- als schrijfondersteuning nodig.
Optie Beschrijving Patroon en voorbeeldgebruik
sasToken Het SAS-token voor toegang tot het relatieve opslagaccount met de juiste machtigingen <token>

Opties voor toegangsbeheer op basis van referenties

Als alternatief voor het gebruik van een beheerde identiteit of een gebruikersidentiteit kunt u expliciete referenties opgeven om de Spark CDM-connector toegang te geven tot gegevens. Maak in Microsoft Entra ID een app-registratie. Verdeel deze app-registratie vervolgens toegang tot het opslagaccount met behulp van een van de volgende rollen:

  • Inzender voor opslagblobgegevens, zodat de bibliotheek kan schrijven naar Common Data Model-mappen
  • Opslagblobgegevenslezer om alleen leesmachtigingen toe te staan

Nadat u machtigingen hebt gemaakt, kunt u de app-id, app-sleutel en tenant-id doorgeven aan de connector bij elke aanroep ervan met behulp van de volgende opties. U wordt aangeraden Azure Key Vault te gebruiken om deze waarden op te slaan om ervoor te zorgen dat ze niet worden opgeslagen in duidelijke tekst in uw notebookbestand.

Optie Beschrijving Patroon en voorbeeldgebruik
appId De app-registratie-id voor verificatie bij het opslagaccount <guid>
appKey De geregistreerde app-sleutel of -geheim <encrypted secret>
tenantId De Tenant-id van Microsoft Entra waaronder de app is geregistreerd <guid>

Voorbeelden

In de volgende voorbeelden worden alle variabelen gebruiktappIdappKey, en tenantId variabelen. U hebt deze variabelen eerder in de code geïnitialiseerd op basis van een Azure-app-registratie: Machtigingen voor inzender voor opslagblobgegevens voor de opslag voor schrijven en machtigingen voor leesbewerkingen voor Opslagblobgegevenslezer.

Read

Met deze code wordt de Person entiteit uit de map Common Data Model gelezen met een manifest in mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json:

val df = spark.read.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .load()

Impliciet schrijven met alleen een DataFrame-schema

Met de volgende code wordt het df DataFrame naar een map Common Data Model geschreven met een manifest naar mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json een gebeurtenisentiteit.

De code schrijft gebeurtenisgegevens als Parquet-bestanden, comprimeert deze met gzipen voegt deze toe aan de map. (Met de code worden nieuwe bestanden toegevoegd zonder bestaande bestanden te verwijderen.)


df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
 .option("entity", "Event")
 .option("format", "parquet")
 .option("compression", "gzip")
 .mode(SaveMode.Append)
 .save()

Expliciet schrijven met behulp van een entiteitsdefinitie die is opgeslagen in Data Lake Storage

Met de volgende code wordt het df DataFrame naar een map Common Data Model geschreven met een manifest https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json bij de Person entiteit. De code schrijft persoonsgegevens als nieuwe CSV-bestanden (standaard) die bestaande bestanden in de map overschrijven.

Met de code wordt de Person entiteitsdefinitie opgehaald uit https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json.

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
 .option("entity", "Person")
 .option("entityDefinitionModelRoot", "cdmmodels/core")
 .option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
 .mode(SaveMode.Overwrite)
 .save()

Expliciet schrijven met behulp van een entiteit die is gedefinieerd in de GitHub-opslagplaats Common Data Model

Met de volgende code wordt het df DataFrame naar een map Common Data Model geschreven met:

  • Het manifest bij https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json.
  • Een submanifest dat de TeamMembership entiteit bevat die is gemaakt in een submap TeamMembership .

TeamMembership gegevens worden geschreven naar CSV-bestanden (de standaardinstelling) die bestaande gegevensbestanden overschrijven. Met de code wordt de TeamMembership entiteitsdefinitie opgehaald uit het Common Data Model CDN bij Teamlidmaatschap in applicationCommon.

df.write.format("com.microsoft.cdm")
 .option("storage", "mystorage.dfs.core.windows.net")
 .option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
 .option("entity", "TeamMembership")
 .option("useCdmStandardModelRoot", true)
 .option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
 .option("useSubManifest", true)
 .mode(SaveMode.Overwrite)
 .save()

Andere overwegingen

Gegevenstypen van Spark toewijzen aan Common Data Model

De connector past de volgende gegevenstypetoewijzingen toe wanneer u Common Data Model converteert naar of van Spark.

Spark Common Data Model
ShortType SmallInteger
IntegerType Integer
LongType BigInteger
DateType Date
Timestamp DateTime (optioneel Time)
StringType String
DoubleType Double
DecimalType(x,y) Decimal (x,y) (standaardschaal en precisie zijn 18,4)
FloatType Float
BooleanType Boolean
ByteType Byte

De connector biedt geen ondersteuning voor het gegevenstype Common Data Model Binary .

Common Data Model Date, DateTime en DateTimeOffset-gegevens verwerken

De Spark CDM-connector verwerkt Common Data Model Date en DateTime gegevenstypen zoals normaal voor Spark en Parquet. In CSV leest en schrijft de connector deze gegevenstypen in ISO 8601-indeling.

De connector interpreteert waarden van het gegevenstype Common Data Model DateTime als UTC. In CSV schrijft de connector deze waarden in ISO 8601-indeling. Een voorbeeld is 2020-03-13 09:49:00Z.

Common Data Model-waarden DateTimeOffset die zijn bedoeld voor het vastleggen van lokale tijds instants, worden anders verwerkt in Spark en Parquet van CSV. CSV en andere indelingen kunnen een lokale tijd direct uitdrukken als een structuur die bestaat uit een datum/tijd, zoals 2020-03-13 09:49:00-08:00. Parquet en Spark ondersteunen dergelijke structuren niet. In plaats daarvan gebruiken ze een TIMESTAMP gegevenstype waarmee direct kan worden vastgelegd in UTC (of in een niet-opgegeven tijdzone).

De Spark CDM-connector converteert een DateTimeOffset waarde in CSV naar een UTC-tijdstempel. Deze waarde blijft behouden als een tijdstempel in Parquet. Als de waarde later wordt bewaard in CSV, wordt deze geserialiseerd als een DateTimeOffset waarde met een offset van +00:00. Er is geen verlies van tijdelijke nauwkeurigheid. De geserialiseerde waarden vertegenwoordigen hetzelfde moment als de oorspronkelijke waarden, hoewel de offset verloren gaat.

Spark-systemen gebruiken hun systeemtijd als basislijn en drukken normaal gesproken tijd uit met behulp van die lokale tijd. UTC-tijden kunnen altijd worden berekend via de toepassing van het lokale systeemverschil. Voor Azure-systemen in alle regio's is de systeemtijd altijd UTC, dus alle tijdstempelwaarden zijn normaal gesproken in UTC. Wanneer u een impliciete schrijfbewerking gebruikt, waarbij een Common Data Model-definitie wordt afgeleid van een DataFrame, worden tijdstempelkolommen vertaald naar kenmerken met het gegevenstype Common Data Model DateTime , wat een UTC-tijd impliceert.

Als het belangrijk is om een lokale tijd te behouden en de gegevens worden verwerkt in Spark of persistent zijn in Parquet, raden we u aan een DateTime kenmerk te gebruiken en de offset in een afzonderlijk kenmerk te behouden. U kunt de offset bijvoorbeeld behouden als een ondertekende geheel getalwaarde die minuten vertegenwoordigt. In Common Data Model bevinden datum/tijd-waarden zich in UTC, dus u moet de offset toepassen om lokale tijd te berekenen.

In de meeste gevallen is het behouden van lokale tijd niet belangrijk. Lokale tijden zijn vaak alleen vereist in een gebruikersinterface voor het gemak van gebruikers en op basis van de tijdzone van de gebruiker, dus het opslaan van een UTC-tijd is vaak een betere oplossing.

Common Data Model-tijdgegevens verwerken

Spark biedt geen ondersteuning voor een expliciet Time gegevenstype. Een kenmerk met het gegevenstype Common Data Model Time wordt weergegeven in een Spark DataFrame als een kolom met een Timestamp gegevenstype. Wanneer de Spark CDM-connector een tijdwaarde leest, wordt het tijdstempel in het DataFrame geïnitialiseerd met de Spark-epochdatum 01/01/1970 plus de tijdwaarde die uit de bron wordt gelezen.

Wanneer u expliciet schrijven gebruikt, kunt u een tijdstempelkolom toewijzen aan een DateTime of Time kenmerk. Als u een tijdstempel toe te wijzen aan een Time kenmerk, wordt het datumgedeelte van de tijdstempel verwijderd.

Wanneer u impliciet schrijven gebruikt, wordt standaard een tijdstempelkolom toegewezen aan een DateTime kenmerk. Als u een tijdstempelkolom wilt toewijzen aan een Time kenmerk, moet u een metagegevensobject toevoegen aan de kolom in het DataFrame dat aangeeft dat het tijdstempel moet worden geïnterpreteerd als een tijdwaarde. De volgende code laat zien hoe u dit doet in Scala:

val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))

Nauwkeurigheid van tijdwaarde

De Spark CDM-connector ondersteunt tijdwaarden in DateTime of Time. Seconden hebben maximaal zes decimalen, op basis van de indeling van de gegevens in het bestand dat wordt gelezen (CSV of Parquet) of zoals gedefinieerd in het DataFrame. Het gebruik van zes decimalen maakt nauwkeurigheid van enkele seconden tot microseconden mogelijk.

Naamgeving en organisatie van mappen en bestanden

Wanneer u naar de mappen Common Data Model schrijft, is er een standaardorganisatie voor mappen. Gegevensbestanden worden standaard geschreven in mappen die zijn gemaakt voor de huidige datum, met de naam 2010-07-31. U kunt de mapstructuur en namen aanpassen met behulp van de dateFolderFormat optie.

Namen van gegevensbestanden zijn gebaseerd op het volgende patroon: <entity-jobid>><-*.<bestandsopmaak>.

U kunt het aantal gegevenspartities beheren dat is geschreven met behulp van de sparkContext.parallelize() methode. Het aantal partities wordt bepaald door het aantal uitvoerders in het Spark-cluster of expliciet opgegeven. In het volgende Scala-voorbeeld wordt een DataFrame met twee partities gemaakt:

val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)

Hier volgt een voorbeeld van een expliciete schrijfbewerking die is gedefinieerd door een definitie van een entiteit waarnaar wordt verwezen:

+-- <CDMFolder>
     |-- default.manifest.cdm.json     << with entity reference and partition info
     +-- <Entity>
          |-- <entity>.cdm.json        << resolved physical entity definition
          |-- <data folder>
          |-- <data folder>
          +-- ...                            

Hier volgt een voorbeeld van een expliciete schrijfbewerking met een submanifest:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
         |-- <entity>.cdm.json
         |-- <entity>.manifest.cdm.json << submanifest with partition info
         |-- <data folder>
         |-- <data folder>
         +-- ...

Hier volgt een voorbeeld van een impliciete schrijfbewerking waarin de entiteitsdefinitie is afgeleid van een DataFrame-schema:

+-- <CDMFolder>
    |-- default.manifest.cdm.json
    +-- <Entity>
         |-- <entity>.cdm.json          << resolved physical entity definition
         +-- LogicalDefinition
         |   +-- <entity>.cdm.json      << logical entity definitions
         |-- <data folder>
         |-- <data folder>
         +-- ...

Hier volgt een voorbeeld van een impliciete schrijfbewerking met een submanifest:

+-- <CDMFolder>
    |-- default.manifest.cdm.json       << contains reference to submanifest
    +-- <Entity>
        |-- <entity>.cdm.json           << resolved physical entity definition
        |-- <entity>.manifest.cdm.json  << submanifest with reference to the entity and partition info
        +-- LogicalDefinition
        |   +-- <entity>.cdm.json       << logical entity definitions
        |-- <data folder>
        |-- <data folder>
        +-- ...

Probleemoplossing en bekende problemen

  • Zorg ervoor dat de decimale precisie en schaal van decimale gegevenstypevelden die u in het DataFrame gebruikt, overeenkomen met het gegevenstype in de entiteitsdefinitie Common Data Model. Als de precisie en schaal niet expliciet zijn gedefinieerd in Common Data Model, is de standaardwaarde Decimal(18,4). Voor model.json bestanden Decimal wordt ervan uitgegaan dat Decimal(18,4).
  • Map- en bestandsnamen in de volgende opties mogen geen spaties of speciale tekens bevatten, zoals een gelijkteken (=): manifestPath, , , entityDefinitionPath. dataFolderFormatentityDefinitionModelRoot

Volgende stappen

U kunt nu de andere Apache Spark-connectors bekijken: