Complexe gegevenstypen analyseren in Azure Synapse Analytics
Dit artikel is relevant voor Parquet-bestanden en -containers in Azure Synapse Link voor Azure Cosmos DB. U kunt Spark of SQL gebruiken om gegevens te lezen of transformeren met complexe schema's, zoals matrices of geneste structuren. Het volgende voorbeeld is voltooid met één document, maar kan eenvoudig worden geschaald naar miljarden documenten met Spark of SQL. De code in dit artikel maakt gebruik van PySpark (Python).
Gebruiksscenario
Complexe gegevenstypen komen steeds vaker voor en vormen een uitdaging voor data engineers. Het analyseren van geneste schema's en matrices kan tijdrovende en complexe SQL-query's omvatten. Daarnaast kan het lastig zijn om de naam van het gegevenstype geneste kolommen te wijzigen of te casten. Als u met diep geneste objecten werkt, kunt u ook prestatieproblemen ondervinden.
Data engineers moeten begrijpen hoe ze complexe gegevenstypen efficiënt kunnen verwerken en ze gemakkelijk toegankelijk kunnen maken voor iedereen. In het volgende voorbeeld gebruikt u Spark in Azure Synapse Analytics om objecten te lezen en te transformeren in een platte structuur via gegevensframes. U gebruikt het serverloze model van SQL in Azure Synapse Analytics om dergelijke objecten rechtstreeks op te vragen en deze resultaten als een gewone tabel te retourneren.
Wat zijn matrices en geneste structuren?
Het volgende object is afkomstig van Application Insights. In dit object zijn er geneste structuren en matrices die geneste structuren bevatten.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Schemavoorbeeld van matrices en geneste structuren
Wanneer u het schema van het gegevensframe van het object ( df) afdrukt met de opdracht df.printschema
, ziet u de volgende weergave:
- Geel vertegenwoordigt geneste structuren.
- Groen vertegenwoordigt een matrix met twee elementen.
_rid
, _ts
en _etag
zijn toegevoegd aan het systeem omdat het document is opgenomen in het transactionele archief van Azure Cosmos DB.
Het voorgaande gegevensframe telt alleen voor 5 kolommen en 1 rij. Na de transformatie heeft het gecureerde gegevensframe 13 kolommen en 2 rijen, in tabelvorm.
Geneste structuren plat maken en matrices exploderen
Met Spark in Azure Synapse Analytics kunt u geneste structuren eenvoudig transformeren in kolommen en matrixelementen in meerdere rijen. Gebruik de volgende stappen voor implementatie.
Een functie definiëren om het geneste schema af te vlakken
U kunt deze functie zonder wijziging gebruiken. Maak een cel in een PySpark-notebook met de volgende functie:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
De functie gebruiken om het geneste schema af te vlakken
In deze stap vlakt u het geneste schema van het gegevensframe (df) af in een nieuw gegevensframe (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
De weergavefunctie moet 10 kolommen en 1 rij retourneren. De matrix en de geneste elementen zijn er nog.
De matrix transformeren
Hier transformeert u de matrix, context_custom_dimensions
in het gegevensframe df_flat
, in een nieuw gegevensframe df_flat_explode
. In de volgende code definieert u ook welke kolom u wilt selecteren:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
De weergavefunctie moet 10 kolommen en 2 rijen retourneren. De volgende stap bestaat uit het plat maken van geneste schema's met de functie die is gedefinieerd in stap 1.
De functie gebruiken om het geneste schema af te vlakken
Ten slotte gebruikt u de functie om het geneste schema van het gegevensframe plat te maken, in een nieuw gegevensframe df_flat_explode
, df_flat_explode_flat
:
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
De weergavefunctie moet 13 kolommen en 2 rijen weergeven.
De functie printSchema
van het gegevensframe df_flat_explode_flat
retourneert het volgende resultaat:
Matrices en geneste structuren rechtstreeks lezen
Met het serverloze model van SQL kunt u weergaven en tabellen op dergelijke objecten opvragen en maken.
Afhankelijk van hoe de gegevens zijn opgeslagen, moeten gebruikers eerst de volgende taxonomie gebruiken. Alles wat in hoofdletters wordt weergegeven, is specifiek voor uw use-case:
Bulk | Notatie |
---|---|
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' | Parquet (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' | CosmosDB (Azure Synapse Link) |
Vervang elk veld als volgt:
- 'UW BULK HIERBOVEN' is het verbindingsreeks van de gegevensbron waarmee u verbinding maakt.
- 'UW TYPE HIERBOVEN' is de indeling die u gebruikt om verbinding te maken met de bron.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Er zijn twee verschillende typen bewerkingen:
Het eerste bewerkingstype wordt aangegeven in de volgende coderegel, die de kolom
contextdataeventTime
definieert die naar het geneste element verwijst.Context.Data.eventTime
contextdataeventTime varchar(50) '$.context.data.eventTime'
Deze regel definieert de kolom met de naam
contextdataeventTime
die verwijst naar het geneste element,Context>Data>eventTime
.Het tweede bewerkingstype gebruikt
cross apply
om nieuwe rijen te maken voor elk element onder de matrix. Vervolgens wordt elk genest object gedefinieerd.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Als de matrix 5 elementen met vier geneste structuren heeft, retourneert het serverloze model van SQL 5 rijen en 4 kolommen. Het serverloze model van SQL kan query's uitvoeren, de matrix in twee rijen toewijzen en alle geneste structuren weergeven in kolommen.