Spark Common Data Model-Connector für Azure Synapse Analytics
Der Spark Common Data Model-Connector (Spark CDM-Connector) ist ein Formatleser/Formatschreiber in Azure Synapse Analytics. Es ermöglicht einem Spark-Programm, Common Data Model-Entitäten in einem Common Data Model-Ordner über Spark-DataFrames zu lesen und zu schreiben.
Informationen zum Definieren von Common Data Model-Dokumenten mithilfe von Common Data Model 1.2 finden Sie in diesem Artikel über die Funktion von Common Data Model und dessen Verwendung.
Funktionen
Auf hoher Ebene unterstützt der Connector Folgendes:
- 3.1 und 3.2 und 3.3.
- Lesen von Daten aus einer Entität in einem Common Data Model-Ordner in ein Spark-DataFrame.
- Schreiben aus einem Spark-DataFrame in eine Entität in einem Common Data Model-Ordner basierend auf einer Common Data Model-Entitätsdefinition.
- Schreiben aus einem Spark-DataFrame in eine Entität in einem Common Data Model-Ordner basierend auf dem DataFrame-Schema.
Der Connector unterstützt auch Folgendes:
- Lesen und Schreiben in Common Data Model-Ordnern in Azure Data Lake Storage mit aktiviertem hierarchischem Namespace (HNS).
- Lesen aus Common Data Model-Ordnern, die entweder durch Manifest- oder model.json-Dateien beschrieben werden.
- Schreiben in Common Data Model-Ordner, die durch eine Manifestdatei beschrieben werden.
- Daten im CSV-Format mit oder ohne Spaltenüberschriften und mit vom Benutzer auswählbaren Trennzeichen.
- Daten im Apache Parquet-Format, einschließlich geschachteltes Parquet.
- Untermanifeste beim Lesen und die optionale Verwendung von Untermanifesten mit Entitäts-Geltungsbereich beim Schreiben.
- Schreiben von Daten über vom Benutzer änderbare Partitionsmuster.
- Verwendung von verwalteten Identitäten und Anmeldeinformationen in Azure Synapse Analytics.
- Auflösen von Common Data Model-Aliasspeicherorten, die in Importen verwendet werden, über Common Data Model-Adapterdefinitionen, die in einer config.json-Datei beschrieben werden.
Einschränkungen
Der Connector unterstützt die folgenden Funktionalitäten und Szenarien nicht:
- Parallele Schreibvorgänge. Wir empfehlen diese nicht. Auf der Speicherebene gibt es keinen Sperrmechanismus.
- Programmgesteuerter Zugriff auf Entitätsmetadaten nachdem Sie eine Entität lesen.
- Programmgesteuerter Zugriff zum Festlegen oder Überschreiben von Metadaten, wenn Sie eine Entität schreiben.
- Schemaabweichung, wenn Daten in einem DataFrame, das geschrieben wird, zusätzliche Attribute enthalten, die nicht in der Entitätsdefinition enthalten sind.
- Schemaentwicklung, wenn Entitätspartitionen auf verschiedene Versionen der Entitätsdefinition verwiesen. Sie können die Version überprüfen, indem Sie
com.microsoft.cdm.BuildInfo.version
ausführen. - Schreibunterstützung für model.json.
- Schreiben von
Time
-Daten in Parquet. Derzeit unterstützt der Connector nur für CSV-Dateien das Überschreiben einer Zeitstempelspalte, die als Common Data Model-WertTime
und nicht alsDateTime
-Wert interpretiert wird. - Der Parquet-Typ
Map
, Arrays von primitiven Typen und Arrays von Arraytypen. Common Data Model unterstützt diese derzeit nicht, also auch nicht der Spark CDM-Connector.
Beispiele
Um mit der Verwendung des Connectors zu beginnen, sehen Sie sich den Beispielcode und die Common Data Model-Dateien an.
Lesen von Daten
Wenn der Connector Daten liest, verwendet er Metadaten im Common Data Model-Ordner, um das DataFrame basierend auf der aufgelösten Entitätsdefinition für die angegebene Entität zu erstellen, auf die im Manifest verwiesen wird. Der Connector verwendet Entitätsattributnamen als DataFrame-Spaltennamen. Er ordnet Attributdatentypen den Spaltendatentypen zu. Wenn das DataFrame geladen wird, wird es auf der Grundlage der im Manifest identifizierten Entitätspartitionen aufgefüllt.
Der Connector sucht im angegebenen Manifest sowie in allen Untermanifesten der ersten Ebene nach der angegebenen Entität. Wenn sich die benötigte Entität in einem Untermanifest der zweiten Ebene oder tiefer befindet, oder wenn mehrere Entitäten des gleichen Namens in verschiedenen Untermanifesten vorhanden sind, sollten Sie anstelle des Stammmanifests das Untermanifest angeben, das die benötigte Entität enthält.
Entitätspartitionen können aus einer Mischung von Formaten bestehen (z. B. CSV und Parquet). Alle im Manifest identifizierten Entitätsdatendateien werden unabhängig vom Format zu einem einzelnen Dataset zusammengefasst und in das DataFrame geladen.
Wenn der Connector CSV-Daten liest, verwendet er standardmäßig die Spark-Option failfast
. Wenn die Anzahl der Spalten nicht der Anzahl der Attribute in der Entität entspricht, gibt der Connector einen Fehler zurück.
Alternativ unterstützt der Connector ab Version 0.19 den freizügigen Modus (nur für CSV-Dateien). Im freizügigen Modus wird der Connector fehlenden Spalten NULL-Werte zuweisen, wenn eine CSV-Zeile weniger Spalten enthält als das Entitätsschema. Wenn eine CSV-Zeile mehr Spalten enthält als das Entitätsschema, werden die Spalten, die über die Spaltenanzahl des Entitätsschemas hinausgehen, auf die Spaltenanzahl des Schemas gekürzt. Syntax:
.option("mode", "permissive") or .option("mode", "failfast")
Schreiben von Daten
Wenn der Connector in einen Common Data Model-Ordner schreibt und die Entität in diesem Ordner noch nicht vorhanden ist, erstellt der Connector eine neue Entität und Definition. Er fügt die Entität und die Definition dem Common Data Model-Ordner hinzu und verweist im Manifest darauf.
Der Connector unterstützt zwei Schreibmodi:
Explizites Schreiben: Die physische Entitätsdefinition basiert auf einer logischen Common Data Model-Entitätsdefinition, die Sie angeben.
Der Connector liest die angegebene Definition der logischen Entität und löst sie auf, um die Definition der physischen Entität zu erstellen, die im Common Data Model-Ordner verwendet wird. Wenn Importanweisungen in einer direkt oder indirekt referenzierten Common Data Model-Definitionsdatei Aliase enthalten, müssen Sie eine config.json-Datei bereitstellen, welche diese Aliase Common Data Model-Adaptern und Speicherorten zuordnet.
- Wenn das DataFrame-Schema nicht mit der Entitätsdefinition übereinstimmt, auf die verwiesen wird, gibt der Connector einen Fehler zurück. Stellen Sie sicher, dass die Spaltendatentypen im DataFrame den Attributdatentypen in der Entität entsprechen, auch für Dezimaldaten, Genauigkeit und Skalierungsgruppe über Merkmale im Common Data Model.
- Wenn das DataFrame Inkonsistenzen mit der Entitätsdefinition aufweist, gibt der Connector einen Fehler zurück.
- Wenn das DataFrame konsistent ist:
- Wenn die Entität im Manifest bereits vorhanden ist, löst der Connector die bereitgestellte Entitätsdefinition auf und überprüft sie anhand der Definition im Common Data Model-Ordner. Wenn die Definitionen nicht übereinstimmen, gibt der Connector einen Fehler zurück. Andernfalls schreibt der Connector Daten und aktualisiert die Partitionsinformationen im Manifest.
- Wenn die Entität im Common Data Model-Ordner nicht vorhanden ist, schreibt der Connector eine aufgelöste Kopie der Entitätsdefinition in das Manifest im Common Data Model-Ordner. Der Connector schreibt Daten und aktualisiert die Partitionsinformationen im Manifest.
Implizites Schreiben: Die Entitätsdefinition wird von der DataFrame-Struktur abgeleitet.
Wenn die Entität im Common Data Model-Ordner nicht vorhanden ist, verwendet der Connector die implizite Definition, um die aufgelöste Entitätsdefinition im Common Data Model-Zielordner zu erstellen.
Wenn die Entität im Common Data Model-Ordner vorhanden ist, überprüft der Connector die implizite Definition anhand der vorhandenen Entitätsdefinition. Wenn die Definitionen nicht übereinstimmen, gibt der Connector einen Fehler zurück. Andernfalls schreibt der Connector Daten und schreibt abgeleitete logische Entitätsdefinitionen in einen Unterordner des Entitätsordners.
Der Connector schreibt Daten in Datenordner innerhalb eines Entitätsunterordners. Mithilfe eines Speichermodus wird gesteuert, ob vorhandene Daten mit den neuen Daten überschrieben werden, ob die neuen Daten an bereits vorhandene Daten angefügt werden oder ob ein Fehler zurückgegeben wird, wenn bereits Daten vorhanden sind. Standardmäßig wird ein Fehler zurückgegeben, wenn bereits Daten vorhanden sind.
Integration des Common Data Model-Alias
Common Data Model-Definitionsdateien verwenden Aliase in Importanweisungen, um die Importanweisung zu vereinfachen und die späte Bindung des Speicherorts der importierten Inhalte zur Laufzeit zu ermöglichen. Die Verwendung von Aliasen bietet folgende Vorteile:
- Erleichtert die Organisation von Common Data Model-Dateien, so dass verwandte Common Data Model-Definitionen an verschiedenen Speicherorten gruppiert werden können.
- Ermöglicht den Zugriff auf Common Data Model-Inhalte über verschiedene bereitgestellte Speicherorte zur Laufzeit.
Der folgende Codeausschnitt zeigt die Verwendung von Aliasen in Importanweisungen in einer Common Data Model-Definitionsdatei:
"imports": [
{
"corpusPath": "cdm:/foundations.cdm.json"
},
{
"corpusPath": "core:/TrackedEntity.cdm.json"
},
{
"corpusPath": "Customer.cdm.json"
}
]
Das vorstehende Beispiel verwendet cdm
als Alias für den Speicherort der Common Data Model-Grundlagendatei. Es verwendet core
als Alias für den Speicherort der TrackedEntity
-Definitionsdatei.
Aliase sind Textbeschriftungen, die mit einem Namespacewert in einem Adaptereintrag in einer Common Data Model-Datei mit Namen config.json abgeglichen werden. Ein Adaptereintrag gibt den Adaptertyp (z. B. adls
, CDN
, GitHub
oder local
) und eine URL an, die einen Speicherort definiert. Einige Adapter unterstützen andere Konfigurationsoptionen, etwa ein Verbindungstimeout. Während es sich bei Aliasen um beliebige Textbeschriftungen handelt, wird der Alias cdm
auf besondere Weise behandelt.
Der Spark-CDM-Connector sucht am Modellstammspeicherort der Entitätsdefinition nach der zu ladenden Datei config.json. Wenn sich die Datei config.json an einem anderen Speicherort befindet, oder wenn Sie die Datei config.json im Modellstamm überschreiben möchte, können Sie den Speicherort einer config.json-Datei mithilfe der Option configPath
angeben. Die config.json-Datei muss Adaptereinträge für alle Aliase enthalten, die im aufgelösten Common Data Model-Code verwendet werden, oder der Connector meldet einen Fehler.
Die Möglichkeit, die config.json-Datei zu überschreiben, bedeutet, dass Sie Speicherorte für Common Data Model-Definitionen bereitstellen können, auf die bei Laufzeit zugegriffen werden kann. Stellen Sie sicher, dass der zur Laufzeit referenzierte Inhalt mit den Definitionen konsistent ist, die bei der ursprünglichen Common Data Model-Erstellung verwendet wurden.
Gemäß Konvention bezieht sich der cdm
-Alias auf den Speicherort der Common Data Model-Standarddefinitionen auf Stammebene, einschließlich der Datei foundations.cdm.json. Diese Datei enthält die primitiven Common Data Model-Datentypen und einen Kernsatz von Merkmalsdefinitionen, die für die meisten Common Data Model-Entitätsdefinitionen erforderlich sind.
Sie können den cdm
-Alias wie jeden anderen Alias mithilfe eines Adaptereintrags in der config.json-Datei auflösen. Wenn Sie keinen Adapter angeben oder einen NULL-Eintrag bereitstellen, wird der cdm
-Alias standardmäßig in das öffentliche Common Data Model-CDN (Content Delivery Network) unter https://cdm-schema.microsoft.com/logical/
aufgelöst.
Sie können auch die cdmSource
-Option verwenden, um zu überschreiben, wie der cdm
-Alias aufgelöst wird. Die Verwendung der Option cdmSource
ist hilfreich, wenn der cdm
-Alias der einzige in den aufzulösenden Common Data Model-Definitionen verwendete Alias ist, da sich dadurch das Erstellen oder Referenzieren einer config.json-Datei vermeiden lässt.
Parameter, Optionen und Speichermodus
Sowohl für Lese- wie auch Schreibvorgänge geben Sie den Bibliotheksnamen des Spark CDM-Connectors als Parameter an. Sie verwenden ein Reihe von Optionen, um das Verhalten des Connectors zu parametrisieren. Wenn Sie schreiben, unterstützt der Connector auch einen Speichermodus.
Der Name der Connectorbibliothek, die Optionen und der Speichermodus werden wie folgt formatiert:
dataframe.read.format("com.microsoft.cdm") [.option("option", "value")]*
dataframe.write.format("com.microsoft.cdm") [.option("option", "value")]* .mode(savemode.\<saveMode\>)
Hier sehen Sie ein Beispiel, das einige der Optionen bei der Verwendung des Connectors für Lesevorgänge zeigt:
val readDf = spark.read.format("com.microsoft.cdm")
.option("storage", "mystorageaccount.dfs.core.windows.net")
.option("manifestPath", "customerleads/default.manifest.cdm.json")
.option("entity", "Customer")
.load()
Allgemeine Optionen für Lese- und Schreibvorgänge
Die folgenden Optionen identifizieren die Entität im Common Data Model-Ordner, aus dem Sie lesen oder in den Sie schreiben.
Option | Beschreibung | Muster und Beispielverwendung |
---|---|---|
storage |
Die Endpunkt-URL für das Azure Data Lake Storage-Konto mit aktiviertem HNS, das den Common Data Model-Ordner enthält. Verwenden Sie die dfs.core.windows.net -URL. |
<accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net" |
manifestPath |
Der relative Pfad zum Manifest oder zur model.json-Datei im Speicherkonto. Bei Lesevorgängen kann es sich um ein Stammmanifest, ein Untermanifest oder eine model.json-Datei handeln. Bei Schreibvorgängen muss es sich um ein Stammmanifest handeln. | <container>/{<folderPath>}<manifestFileName> , "mycontainer/default.manifest.cdm.json" "models/hr/employees.manifest.cdm.json" "models/hr/employees/model.json" (nur lesen) |
entity |
Der Name der Quell- oder Zielentität im Manifest. Beim erstmaligen Schreiben einer Entität in einen Ordner gibt der Connector der aufgelösten Entitätsdefinition diesem Namen. Beim Entitätsnamen muss die Groß- und Kleinschreibung berücksichtigt werden. | <entityName> "customer" |
maxCDMThreads |
Die maximale Anzahl gleichzeitiger Lesevorgänge, während der Connector eine Entitätsdefinition auflöst. | Eine beliebige gültige ganze Zahl, z. B. 5 |
Hinweis
Beim Lesen müssen Sie nicht mehr zusätzlich zur physischen Entitätsdefinition im Common Data Model-Ordner eine logische Entitätsdefinition angeben.
Optionen für explizites Schreiben
Mit den folgenden Optionen wird die logische Entitätsdefinition für die Entität definiert, die geschrieben wird. Die logische Entitätsdefinition wird in eine physische Definition aufgelöst, die definiert, wie die Entität geschrieben wird.
Option | Beschreibung | Muster- oder Beispielverwendung |
---|---|---|
entityDefinitionStorage |
Das Azure Data Lake Storage-Konto, das die Entitätsdefinition enthält. Erforderlich, wenn es sich von dem Speicherkonto unterscheidet, das den Common Data Model-Ordner hostet. | <accountName>.dfs.core.windows.net "myAccount.dfs.core.windows.net" |
entityDefinitionModelRoot |
Der Speicherort des Modellstamms oder Korpus innerhalb des Kontos | <container>/<folderPath> "crm/core" |
entityDefinitionPath |
Der Speicherort der Entität. Dies ist der Dateipfad zur Common Data Model-Definitionsdatei relativ zum Modellstamm, einschließlich des Namens der Entität in dieser Datei. | <folderPath>/<entityName>.cdm.json/<entityName> "sales/customer.cdm.json/customer" |
configPath |
Der Container und der Ordnerpfad zu einer config.json-Datei mit den Adapterkonfigurationen für alle Aliase, die in der Entitätsdefinitionsdatei enthalten sind, sowie alle direkt oder indirekt referenzierten Common Data Model-Dateien. Diese Option ist nicht erforderlich, wenn sich config.json im Modellstammordner befindet. |
<container><folderPath> |
useCdmStandardModelRoot |
Gibt an, dass sich der Modellstamm unter https://cdm-schema.microsoft.com/CDM/logical/ befindet. Wird verwendet, um auf Entitätstypen zu verweisen, die im Common Data Model-CDN definiert sind. Überschreibt entityDefinitionStorage und entityDefinitionModelRoot (falls angegeben). |
"useCdmStandardModelRoot" |
cdmSource |
Definiert, wie der cdm -Alias aufgelöst wird (sofern er in Common Data Model-Definitionsdateien vorhanden ist). Wenn Sie diese Option verwenden, überschreibt sie alle cdm -Adapter, die in der config.json-Datei angegeben sind. Die verfügbaren Werte sind builtin oder referenced . Der Standardwert ist referenced .Wenn Sie diese Option auf referenced festlegen, verwendet der Connector die aktuellen veröffentlichten Common Data Model-Standarddefinitionen unter https://cdm-schema.microsoft.com/logical/ . Wenn Sie diese Option auf builtin festlegen, verwendet der Connector die Common Data Model-Basisdefinitionen, die in das vom Connector verwendete Common Data Model-Objektmodell integriert sind. Hinweis: * Der Spark CDM-Connector verwendet möglicherweise nicht das aktuelle Common Data Model-SDK und enthält daher möglicherweise nicht die aktuellen veröffentlichten Standarddefinitionen. * Die integrierten Definitionen enthalten nur den Common Data Model-Inhalt der obersten Ebene, z. B. foundations.cdm.json oder primitives.cdm.json. Wenn Sie Common Data Model-Standarddefinitionen auf niedrigerer Ebene verwenden möchten, verwenden Sie entweder referenced , oder schließen sie in config.json einencdm -Adapter ein. |
"builtin" |"referenced" |
Im vorstehenden Beispiel lautet der vollständige Pfad zum Definitionsobjekt der Kundenentität https://myAccount.dfs.core.windows.net/models/crm/core/sales/customer.cdm.json/customer
. In diesem Pfad sind Modelle der Container in Azure Data Lake Storage.
Optionen für implizites Schreiben
Wenn Sie beim Schreiben keine logische Entitätsdefinition angeben, wird die Entität implizit basierend auf dem DataFrame-Schema geschrieben.
Wenn Sie implizit schreiben, wird eine Zeitstempelspalte normalerweise als Common Data Model-Datentyp DateTime
interpretiert. Sie können diese Interpretation überschreiben, um ein Attribut des Common Data Model-Datentyps Time
zu erstellen, indem Sie ein Metadatenobjekt bereitstellen, das der Spalte zugeordnet ist, die den Datentyp angibt. Details finden Sie unter Behandeln von Common Data Model-Zeitdaten später in diesem Artikel.
Unterstützung für das Schreiben von Zeitdaten ist nur für CSV-Dateien vorhanden. Diese Unterstützung erstreckt sich derzeit nicht auf Parquet.
Optionen für Ordnerstruktur und Datenformat
Sie können die folgenden Optionen verwenden, um die Ordnerorganisation und das Dateiformat zu ändern.
Option | Beschreibung | Muster- oder Beispielverwendung |
---|---|---|
useSubManifest |
Wenn true , wird die Zielentität über ein Untermanifest in das Stammmanifest eingeschlossen. Das Untermanifest und die Entitätsdefinition werden in einen Entitätsordner unterhalb des Stamms geschrieben. Der Standardwert ist false . |
"true" |"false" |
format |
Definiert das Dateiformat. Aktuell werden die Dateiformate CSV und Parquet unterstützt. Der Standardwert ist csv . |
"csv" |"parquet" |
delimiter |
Gilt nur für CSV. Definiert das von Ihnen verwendete Trennzeichen. Standardmäßig wird ein Komma verwendet. | "|" |
columnHeaders |
Gilt nur für CSV. Wenn true , wird Datendateien eine erste Zeile mit Spaltenüberschriften hinzugefügt. Der Standardwert ist true . |
"true" |"false" |
compression |
Nur Schreibzugriff. Gilt nur für Parquet. Definiert das von Ihnen verwendete Komprimierungsformat. Der Standardwert ist snappy . |
"uncompressed" | "snappy" | "gzip" | "lzo" |
dataFolderFormat |
Erlaubt für eine durch Benutzer definierbare Datenordnerstruktur innerhalb eines Entitätsordners. Ermöglicht Ihnen das Ersetzen von Datums- und Zeitwerten in Ordnernamen mithilfe von DateTimeFormatter -Formatierungen. Inhalte die nicht von Formatierern stammen, müssen in einfache Anführungszeichen gesetzt werden. Das Standardformat ist "yyyy"-"MM"-"dd" , das Ordnernamen wie 2020-07-30 erzeugt. |
year "yyyy" / month "MM" "Data" |
Speichermodus
Der Speichermodus gibt an, wie der Connector vorhandene Entitätsdaten im Common Data Model-Ordner behandelt, wenn Sie ein DataFrame schreiben. Optionen sind Überschreiben, Anfügen an oder Rückgabe eines Fehlers, wenn bereits Daten vorhanden sind. Der Standardspeichermodus ist ErrorIfExists
.
Mode | Beschreibung |
---|---|
SaveMode.Overwrite |
Überschreibt die vorhandene Entitätsdefinition, wenn sie geändert wird, und ersetzt vorhandene Datenpartitionen durch die zu schreibenden Datenpartitionen. |
SaveMode.Append |
Fügt Daten, die in neue Partitionen geschrieben werden, an die bestehenden Partitionen an. Dieser Modus unterstützt das Ändern des Schemas nicht. Wenn das Schema der Daten, die geschrieben werden, mit der vorhandenen Entitätsdefinition nicht kompatibel ist, löst der Connector einen Fehler aus. |
SaveMode.ErrorIfExists |
Gibt einen Fehler zurück, wenn Partitionen bereits vorhanden sind. |
Ausführliche Informationen dazu, wie Datendateien beim Schreiben benannt und organisiert werden, finden Sie im Abschnitt Ordner- und Dateibenennung und Organisation später in diesem Artikel.
Authentifizierung
Sie können drei Authentifizierungsmodi mit dem Spark CDM-Connector verwenden, um die Common Data Model-Metadaten und -Datenpartitionen zu lesen oder zu schreiben: Passthrough von Anmeldeinformationen, SAS (Shared Access Signature)-Token und App-Registrierung.
Passthrough für Anmeldeinformationen
In Azure Synapse Analytics unterstützt der Spark CDM-Connector die Verwendung verwalteter Identitäten für Azure-Ressourcen, um den Zugriff auf das Azure Data Lake Storage-Konto zu vermitteln, das den Common Data Model-Ordner enthält. Eine verwaltete Identität wird automatisch für jeden Azure Synapse Analytics-Arbeitsbereich erstellt. Der Connector verwendet die verwaltete Identität des Arbeitsbereichs, der das Notebook enthält, in dem der Connector aufgerufen wird, um sich bei den Speicherkonten zu authentifizieren.
Sie müssen sicherstellen, dass die ausgewählte Identität Zugriff auf die entsprechenden Speicherkonten hat:
- Erteilen Sie Berechtigungen als Storage-Blob-Datenmitwirkender, damit die Bibliothek in Common Data Model-Ordner schreiben kann.
- Erteilen Sie Berechtigungen als Storage-Blob-Datenleser, um nur Lesezugriff zuzulassen.
In beiden Fällen sind keine weiteren Connectoroptionen erforderlich.
Optionen für die auf SAS-Token basierte Zugriffssteuerung
Anmeldeinformationen von SAS-Token sind eine zusätzliche Option für die Authentifizierung bei Speicherkonten. Bei der Authentifizierung mit SAS-Token kann sich das SAS-Token auf Container- oder Ordnerebene befinden. Die entsprechenden Berechtigungen sind erforderlich:
- Leseberechtigungen für ein Manifest oder eine Partition benötigen nur Unterstützung auf Leseebene.
- Schreibberechtigungen erfordern sowohl Lese- als auch Schreibunterstützung.
Option | Beschreibung | Muster und Beispielverwendung |
---|---|---|
sasToken |
Das SAS-Token für den Zugriff auf das relative Speicherkonto mit den richtigen Berechtigungen | <token> |
Optionen für die auf Anmeldeinformationen basierende Zugriffssteuerung
Als Alternative zur Verwendung einer verwalteten Identität oder einer Benutzeridentität können Sie explizite Anmeldeinformationen bereitstellen, damit der Spark-CDM-Connector auf Daten zugreifen kann. Erstellen Sie in Microsoft Entra ID eine App-Registrierung. Dann gewähren Sie dieser App-Registrierung Zugriff auf das Speicherkonto, indem Sie eine der folgenden Rollen verwenden:
- Storage-Blob-Datenmitwirkender, damit die Bibliothek in Common Data Model-Ordner schreiben kann
- Storage-Blob-Datenleser, um nur Leseberechtigungen zuzulassen
Nachdem Sie die Berechtigungen erstellt haben, können Sie die App-ID, den App-Schlüssel und die Mandanten-ID bei jedem Aufruf an den Connector mithilfe der folgenden Optionen übergeben. Wir empfehlen, dass Sie den Azure Key Vault verwenden, um diese Werte zu speichern, damit sichergestellt ist, dass sie nicht als Klartext in Ihrer Notebook-Datei gespeichert werden.
Option | Beschreibung | Muster und Beispielverwendung |
---|---|---|
appId |
Die App-Registrierungs-ID für die Authentifizierung beim Speicherkonto | <guid> |
appKey |
Der Schlüssel oder das Geheimnis der registrierten App | <encrypted secret> |
tenantId |
Die Microsoft Entra-Mandanten-ID, unter der die App registriert ist | <guid> |
Beispiele
In den folgenden Beispielen verwenden alle die Variablen appId
, appKey
und tenantId
. Sie haben diese Variablen weiter oben im Code basierend auf einer Azure-App-Registrierung initialisiert: Berechtigungen als Storage-Blob-Datenmitwirkender auf dem Speicher zum Schreiben, und Berechtigungen als Storage-Blob-Datenleser zum Lesen.
Lesen
Dieser Code liest die Person
-Entität aus dem Common Data Model-Ordner mit einem Manifest in mystorage.dfs.core.windows.net/cdmdata/contacts/root.manifest.cdm.json
:
val df = spark.read.format("com.microsoft.cdm")
.option("storage", "mystorage.dfs.core.windows.net")
.option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
.option("entity", "Person")
.load()
Implizites Schreiben nur mithilfe eines DataFrame-Schemas
Der folgende Code schreibt das df
-DataFrame in einen Common Data Model-Ordner mit einem Manifest in mystorage.dfs.core.windows.net/cdmdata/Contacts/default.manifest.cdm.json
mit einer Ereignisentität.
Der Code schreibt Ereignisdaten als Parquet-Dateien, komprimiert sie mit gzip
, und fügt sie an den Ordner an. (Der Code fügt neue Dateien hinzu, ohne vorhandene Dateien zu löschen.)
df.write.format("com.microsoft.cdm")
.option("storage", "mystorage.dfs.core.windows.net")
.option("manifestPath", "cdmdata/Contacts/default.manifest.cdm.json")
.option("entity", "Event")
.option("format", "parquet")
.option("compression", "gzip")
.mode(SaveMode.Append)
.save()
Expliziter Schreibvorgang mithilfe einer Entitätsdefinition, die in Data Lake Storage gespeichert ist
Der folgende Code schreibt das df
-DataFrame in einen Common Data Model-Ordner mit einem Manifest in https://_mystorage_.dfs.core.windows.net/cdmdata/Contacts/root.manifest.cdm.json
mit der Person
-Entität. Der Code schreibt Personendaten als neue CSV-Dateien (standardmäßig), die vorhandene Dateien im Ordner überschreiben.
Der Code ruft die Person
-Entitätsdefinition aus https://_mystorage_.dfs.core.windows.net/models/cdmmodels/core/Contacts/Person.cdm.json
ab.
df.write.format("com.microsoft.cdm")
.option("storage", "mystorage.dfs.core.windows.net")
.option("manifestPath", "cdmdata/contacts/root.manifest.cdm.json")
.option("entity", "Person")
.option("entityDefinitionModelRoot", "cdmmodels/core")
.option("entityDefinitionPath", "/Contacts/Person.cdm.json/Person")
.mode(SaveMode.Overwrite)
.save()
Expliziter Schreibvorgang mithilfe einer Entität, die im Common Data Model-GitHub-Repository definiert ist
Der folgende Code schreibt das df
-DataFrame mit in einen Common Data Model-Ordner mit:
- Das Manifest unter
https://_mystorage_.dfs.core.windows.net/cdmdata/Teams/root.manifest.cdm.json
. - Ein Untermanifest, das die
TeamMembership
-Entität enthält, die in einem TeamMembership-Unterverzeichnis erstellt wird.
TeamMembership
-Daten werden in CSV-Dateien (der Standard) geschrieben,die alle bereits vorhandenen Datendateien überschreiben. Der Code ruft die Definition der TeamMembership
-Entität aus dem Common Data Model-CDN aus der Teammitgliedschaft in applicationCommon ab.
df.write.format("com.microsoft.cdm")
.option("storage", "mystorage.dfs.core.windows.net")
.option("manifestPath", "cdmdata/Teams/root.manifest.cdm.json")
.option("entity", "TeamMembership")
.option("useCdmStandardModelRoot", true)
.option("entityDefinitionPath", "core/applicationCommon/TeamMembership.cdm.json/TeamMembership")
.option("useSubManifest", true)
.mode(SaveMode.Overwrite)
.save()
Weitere Überlegungen
Zuordnen von Datentypen von Spark zum Common Data Model
Der Connector wendet die folgenden Datentypzuordnungen an, wenn Sie Common Data Model in oder aus Spark konvertieren.
Spark | Common Data Model |
---|---|
ShortType |
SmallInteger |
IntegerType |
Integer |
LongType |
BigInteger |
DateType |
Date |
Timestamp |
DateTime (optional Time ) |
StringType |
String |
DoubleType |
Double |
DecimalType(x,y) |
Decimal (x,y) (Standardskalierung und -genauigkeit sind 18,4 ) |
FloatType |
Float |
BooleanType |
Boolean |
ByteType |
Byte |
Der Connector unterstützt den Common Data Model-Datentyp Binary
nicht.
Behandeln von Date-, DateTime- und DateTimeOffset-Daten im Common Data Model
Der Spark CDM-Connector verarbeitet Common Data Model-Datentypen Date
und DateTime
wie gewohnt für Spark und Parquet. In CSV liest und schreibt der Connector diese Datentypen im ISO 8601-Format.
Der Connector interpretiert Common Data Model-Datentypenwerte DateTime
als UTC. In CSV schreibt der Connector diese Werte im ISO 8601-Format. z. B. 2020-03-13 09:49:00Z
.
Common Data Model-Werte DateTimeOffset
, die für die Erfassung lokaler Zeitpunkte vorgesehen sind, werden in Spark und Parquet anders behandelt als in CSV. CSV- und andere Formate können einen lokalen Zeitpunkt als eine Struktur ausdrücken, die einen „datetime“-Datentyp umfasst, z. B 2020-03-13 09:49:00-08:00
. Parquet und Spark unterstützen solche Strukturen nicht. Sie verwenden stattdessen einen TIMESTAMP
-Datentyp, der es ermöglicht, einen Zeitpunkt in UTC (oder in einer nicht spezifizierten Zeitzone) aufzuzeichnen.
Der Spark-CDM-Connector konvertiert einen DateTimeOffset
-Wert in CSV in einen UTC-Zeitstempel. Dieser Wert wird als Zeitstempel in Parquet beibehalten. Wenn der Wert später in CSV beibehalten wird, wird er als DateTimeOffset
-Wert mit einem Offset von +00:00 serialisiert. Es gibt keinen Verlust an temporaler Genauigkeit. Die serialisierten Werte stellen den gleichen Zeitpunkt dar wie die ursprünglichen Werte, auch wenn der Offset verloren geht.
Spark-Systemen verwenden ihre Systemzeit als Baseline und drücken Zeit normalerweise unter Verwendung dieser lokalen Zeit aus. UTC-Zeiten können jederzeit mithilfe dem lokalen Systemoffset berechnet werden. Bei Azure-Systemen in allen Regionen ist die Systemzeit immer UTC, sodass normalerweise alle Zeitstempelwerte in UTC vorliegen. Wenn Sie einen impliziten Schreibvorgang verwenden, bei dem eine Common Data Model-Definition von einem DataFrame abgeleitet wird, werden Zeitstempelspalten in Attribute mit dem Common Data Model-Datentyp DateTime
übersetzt, was eine UTC-Zeit impliziert.
Wenn es wichtig ist, eine lokale Zeit beizubehalten und die Daten in Spark verarbeitet oder in Parquet beibehalten werden sollen, empfehlen wir, dass Sie ein DateTime
-Attribut verwenden und das Offset in einem separaten Attribut zu speichern. Beispielsweise können Sie das Offset als ganzzahligen Wert mit Vorzeichen beibehalten, der Minuten darstellt. Im Common Data Model liegen DateTime-Werte in UTC vor, deshalb müssen Sie den Offset auf die lokale Computezeit anwenden.
In den meisten Fällen ist die Speicherung der Ortszeit nicht wichtig. Lokale Zeiten sind auf einer Benutzeroberfläche häufig nur aus Gründen der Benutzerfreundlichkeit und basierend auf der Zeitzone des Benutzers erforderlich, sodass es oftmals besser ist, die UTC-Zeit nicht zu speichern.
Behandeln von Common Data Model-Zeitdaten
Spark unterstützt keinen expliziten Time
-Datentyp. Ein Attribut mit dem Common Data Model-Datentyp Time
wird in einem Spark-DataFrame als eine Spalte mit einem Timestamp
-Datentyp dargestellt. Wenn der Spark CDM-Connector einen Zeitwert liest, wird der Zeitstempel im DataFrame mit dem Spark-Epochendatum 01.01.1970 plus dem Zeitwert initialisiert, der aus der Quelle gelesen wird.
Wenn Sie explizites Schreiben verwenden, können Sie eine Zeitstempelspalte entweder einem DateTime
- oder Time
-Attribut zuordnen. Wenn Sie einem Attribut einen Time
-Zeitstempel zuordnen, wird der Datumsteil des Zeitstempels entfernt.
Wenn Sie implizites Schreiben verwenden, wird standardmäßig eine Zeitstempelspalte einem DateTime
-Attribut zugeordnet. Um eine Zeitstempelspalte einem Time
-Attribut zuzuordnen, müssen Sie der Spalte im DataFrame ein Metadatenobjekt hinzufügen, das angibt, dass der Zeitstempel als Zeitwert interpretiert werden soll. Der folgende Code zeigt, wie Sie dies in Scala tun:
val md = new MetadataBuilder().putString(“dataType”, “Time”)
val schema = StructType(List(
StructField(“ATimeColumn”, TimeStampType, true, md))
Genauigkeit des Zeitwerts
Der Spark CDM-Connector unterstützt Zeitwerte in entweder DateTime
oder Time
. Sekunden haben bis zu sechs Dezimalstellen, basierend auf dem Format der Daten in der Datei, die gelesen wird (CSV oder Parquet), oder wie im DataFrame definiert. Die Verwendung von sechs Dezimalstellen ermöglicht eine Genauigkeit von einzelnen Sekunden bis Mikrosekunden.
Benennung und Organisation von Ordnern und Dateien
Wenn Sie in Common Data Model-Ordner schreiben, gibt es eine Standardordnerorganisation. Datendateien werden standardmäßig in Ordner geschrieben, die für das aktuelle Datum erstellt werden und einen Namen haben wir 2010-07-31. Sie können die Ordnerstruktur und die Namen mithilfe der dateFolderFormat
-Option anpassen.
Datendateinamen basieren auf dem folgenden Muster: <Entität>-<Auftrags-ID>-*.<Dateiformat>.
Sie können die Anzahl der geschriebenen Datenpartitionen mithilfe der sparkContext.parallelize()
-Methode steuern. Die Anzahl von Partitionen wird entweder durch die Anzahl von Executors im Spark-Cluster bestimmt oder kann explizit angegeben werden. Das folgende Scala-Beispiel erstellt ein DataFrame mit zwei Partitionen:
val df= spark.createDataFrame(spark.sparkContext.parallelize(data, 2), schema)
Hier sehen Sie ein Beispiel für einen expliziten Schreibvorgang, der durch eine Entitätsdefinition definiert wird, auf die verwiesen wird:
+-- <CDMFolder>
|-- default.manifest.cdm.json << with entity reference and partition info
+-- <Entity>
|-- <entity>.cdm.json << resolved physical entity definition
|-- <data folder>
|-- <data folder>
+-- ...
Hier sehen Sie ein Beispiel für einen expliziten Schreibvorgang mit einem Untermanifest:
+-- <CDMFolder>
|-- default.manifest.cdm.json << contains reference to submanifest
+-- <Entity>
|-- <entity>.cdm.json
|-- <entity>.manifest.cdm.json << submanifest with partition info
|-- <data folder>
|-- <data folder>
+-- ...
Hier sehen Sie ein Beispiel für einen impliziten Schreibvorgang, bei dem die Entitätsdefinition von einem DataFrame-Schema abgeleitet wird:
+-- <CDMFolder>
|-- default.manifest.cdm.json
+-- <Entity>
|-- <entity>.cdm.json << resolved physical entity definition
+-- LogicalDefinition
| +-- <entity>.cdm.json << logical entity definitions
|-- <data folder>
|-- <data folder>
+-- ...
Hier sehen Sie ein Beispiel für einen impliziten Schreibvorgang mit einem Untermanifest:
+-- <CDMFolder>
|-- default.manifest.cdm.json << contains reference to submanifest
+-- <Entity>
|-- <entity>.cdm.json << resolved physical entity definition
|-- <entity>.manifest.cdm.json << submanifest with reference to the entity and partition info
+-- LogicalDefinition
| +-- <entity>.cdm.json << logical entity definitions
|-- <data folder>
|-- <data folder>
+-- ...
Problembehandlung und bekannte Probleme
- Stellen Sie sicher, dass die Dezimalgenauigkeit und die Skala der dezimalen Datentypenfelder, die Sie im DataFrame verwenden, mit dem Datentyp übereinstimmen, der in der Common Data Model-Entitätsdefinition enthalten ist. Wenn die Genauigkeit und die Skalierung im Common Data Model nicht explizit definiert sind, lautet der Standardwert
Decimal(18,4)
. Für model.json-Dateien wirdDecimal
alsDecimal(18,4)
angenommen. - Ordner- und Dateinamen in den folgenden Optionen dürfen keine Leerzeichen oder Sonderzeichen enthalten, z. B. ein Gleichheitszeichen (=):
manifestPath
,entityDefinitionModelRoot
,entityDefinitionPath
,dataFolderFormat
.
Nächste Schritte
Als Nächstes können Sie sich mit den anderen Apache Spark-Connectors vertraut machen: