Delen via


Complexe gegevenstypen analyseren in Azure Synapse Analytics

Dit artikel is relevant voor Parquet-bestanden en -containers in Azure Synapse Link voor Azure Cosmos DB. U kunt Spark of SQL gebruiken om gegevens te lezen of transformeren met complexe schema's, zoals matrices of geneste structuren. Het volgende voorbeeld is voltooid met één document, maar kan eenvoudig worden geschaald naar miljarden documenten met Spark of SQL. De code in dit artikel maakt gebruik van PySpark (Python).

Gebruiksscenario

Complexe gegevenstypen komen steeds vaker voor en vormen een uitdaging voor data engineers. Het analyseren van geneste schema's en matrices kan tijdrovende en complexe SQL-query's omvatten. Daarnaast kan het lastig zijn om de naam van het gegevenstype geneste kolommen te wijzigen of te casten. Als u met diep geneste objecten werkt, kunt u ook prestatieproblemen ondervinden.

Data engineers moeten begrijpen hoe ze complexe gegevenstypen efficiënt kunnen verwerken en ze gemakkelijk toegankelijk kunnen maken voor iedereen. In het volgende voorbeeld gebruikt u Spark in Azure Synapse Analytics om objecten te lezen en te transformeren in een platte structuur via gegevensframes. U gebruikt het serverloze model van SQL in Azure Synapse Analytics om dergelijke objecten rechtstreeks op te vragen en deze resultaten als een gewone tabel te retourneren.

Wat zijn matrices en geneste structuren?

Het volgende object is afkomstig van Application Insights. In dit object zijn er geneste structuren en matrices die geneste structuren bevatten.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Schemavoorbeeld van matrices en geneste structuren

Wanneer u het schema van het gegevensframe van het object ( df) afdrukt met de opdracht df.printschema, ziet u de volgende weergave:

  • Geel vertegenwoordigt geneste structuren.
  • Groen vertegenwoordigt een matrix met twee elementen.

Code met gele en groene markering, met schema-oorsprong

_rid, _tsen _etag zijn toegevoegd aan het systeem omdat het document is opgenomen in het transactionele archief van Azure Cosmos DB.

Het voorgaande gegevensframe telt alleen voor 5 kolommen en 1 rij. Na de transformatie heeft het gecureerde gegevensframe 13 kolommen en 2 rijen, in tabelvorm.

Geneste structuren plat maken en matrices exploderen

Met Spark in Azure Synapse Analytics kunt u geneste structuren eenvoudig transformeren in kolommen en matrixelementen in meerdere rijen. Gebruik de volgende stappen voor implementatie.

Stroomdiagram met stappen voor Spark-transformaties

Een functie definiëren om het geneste schema af te vlakken

U kunt deze functie zonder wijziging gebruiken. Maak een cel in een PySpark-notebook met de volgende functie:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

De functie gebruiken om het geneste schema af te vlakken

In deze stap vlakt u het geneste schema van het gegevensframe (df) af in een nieuw gegevensframe (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

De weergavefunctie moet 10 kolommen en 1 rij retourneren. De matrix en de geneste elementen zijn er nog.

De matrix transformeren

Hier transformeert u de matrix, context_custom_dimensionsin het gegevensframe df_flat, in een nieuw gegevensframe df_flat_explode. In de volgende code definieert u ook welke kolom u wilt selecteren:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

De weergavefunctie moet 10 kolommen en 2 rijen retourneren. De volgende stap bestaat uit het plat maken van geneste schema's met de functie die is gedefinieerd in stap 1.

De functie gebruiken om het geneste schema af te vlakken

Ten slotte gebruikt u de functie om het geneste schema van het gegevensframe plat te maken, in een nieuw gegevensframe df_flat_explode, df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

De weergavefunctie moet 13 kolommen en 2 rijen weergeven.

De functie printSchema van het gegevensframe df_flat_explode_flat retourneert het volgende resultaat:

Code met het uiteindelijke schema

Matrices en geneste structuren rechtstreeks lezen

Met het serverloze model van SQL kunt u weergaven en tabellen op dergelijke objecten opvragen en maken.

Afhankelijk van hoe de gegevens zijn opgeslagen, moeten gebruikers eerst de volgende taxonomie gebruiken. Alles wat in hoofdletters wordt weergegeven, is specifiek voor uw use-case:

Bulk Notatie
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' Parquet (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' CosmosDB (Azure Synapse Link)

Vervang elk veld als volgt:

  • 'UW BULK HIERBOVEN' is het verbindingsreeks van de gegevensbron waarmee u verbinding maakt.
  • 'UW TYPE HIERBOVEN' is de indeling die u gebruikt om verbinding te maken met de bron.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Er zijn twee verschillende typen bewerkingen:

  • Het eerste bewerkingstype wordt aangegeven in de volgende coderegel, die de kolom contextdataeventTime definieert die naar het geneste element verwijst. Context.Data.eventTime

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Deze regel definieert de kolom met de naam contextdataeventTime die verwijst naar het geneste element, Context>Data>eventTime.

  • Het tweede bewerkingstype gebruikt cross apply om nieuwe rijen te maken voor elk element onder de matrix. Vervolgens wordt elk genest object gedefinieerd.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Als de matrix 5 elementen met vier geneste structuren heeft, retourneert het serverloze model van SQL 5 rijen en 4 kolommen. Het serverloze model van SQL kan query's uitvoeren, de matrix in twee rijen toewijzen en alle geneste structuren weergeven in kolommen.

Volgende stappen