Megosztás a következőn keresztül:


Összetett adattípusok elemzése Azure Synapse Analyticsben

Ez a cikk az Azure Cosmos DB-hez készült Azure Synapse Link parquet-fájljaira és tárolóira vonatkozik. A Spark vagy az SQL használatával összetett sémákkal, például tömbökkel vagy beágyazott struktúrákkal olvashatja vagy alakíthatja át az adatokat. Az alábbi példa egyetlen dokumentummal fejeződik be, de könnyedén méretezhető több milliárd dokumentumra a Spark vagy az SQL használatával. A cikkben szereplő kód a PySparkot (Python) használja.

Használati eset

Az összetett adattípusok egyre gyakoribbak, és kihívást jelentenek az adatmérnökök számára. A beágyazott sémák és tömbök elemzése időigényes és összetett SQL-lekérdezéseket igényelhet. Emellett nehéz lehet átnevezni vagy átírni a beágyazott oszlopok adattípusát. Ha mélyen beágyazott objektumokkal dolgozik, teljesítménybeli problémákba ütközhet.

Az adatmérnököknek tisztában kell lenniük azzal, hogyan dolgozzák fel hatékonyan az összetett adattípusokat, és hogyan teszik őket mindenki számára könnyen elérhetővé. Az alábbi példában a Sparkot használja az Azure Synapse Analyticsben az objektumok adatkereteken keresztüli olvasására és átalakítására egybesimított szerkezetté. Az SQL kiszolgáló nélküli modelljét használja az Azure Synapse Analyticsben az ilyen objektumok közvetlen lekérdezéséhez, és ezeket az eredményeket normál táblaként adja vissza.

Mik azok a tömbök és beágyazott struktúrák?

A következő objektum az Application Insightsból származik. Ebben az objektumban beágyazott struktúrák és tömbök találhatók, amelyek beágyazott struktúrákat tartalmaznak.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Séma példa tömbökre és beágyazott struktúrákra

Amikor az objektum adatkeretének sémáját (a df-et) a paranccsal df.printschemanyomtatja ki, a következő ábrázolás jelenik meg:

  • A sárga a beágyazott struktúrákat jelöli.
  • A zöld egy két elemet tartalmazó tömböt jelöl.

Kód sárga és zöld kiemeléssel, a séma eredetével

_rid, _tsés _etag hozzá lettek adva a rendszerhez, mivel a dokumentumot betöltötték az Azure Cosmos DB tranzakciós tárolójába.

Az előző adatkeretek csak 5 oszlopot és 1 sort tartalmaznak. Az átalakítás után a válogatott adatkeret 13 oszlopból és 2 sorból áll, táblázatos formátumban.

Beágyazott struktúrák simítása és tömbök robbantása

A Spark az Azure Synapse Analyticsben egyszerűen átalakíthatja a beágyazott struktúrákat oszlopokká és tömbelemekké több sorba. A megvalósításhoz kövesse az alábbi lépéseket.

Spark-átalakítások lépéseit bemutató folyamatábra

Függvény definiálása a beágyazott séma simításához

Ezt a függvényt módosítás nélkül is használhatja. Hozzon létre egy cellát egy PySpark-jegyzetfüzetben a következő függvénnyel:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

A függvény használata a beágyazott séma simításához

Ebben a lépésben az adatkeret (df) beágyazott sémáját egy új adatkeretbedf_flat () alakítja:

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

A megjelenítési függvénynek 10 oszlopot és 1 sort kell visszaadnia. A tömb és a beágyazott elemei még mindig ott vannak.

A tömb átalakítása

Ebben az esetben az adatkeretben df_flatlévő tömböt context_custom_dimensionsegy új adatkeretté df_flat_explodealakítja át. A következő kódban azt is megadhatja, hogy melyik oszlopot válassza ki:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

A megjelenítési függvénynek 10 oszlopot és 2 sort kell visszaadnia. A következő lépés a beágyazott sémák egybesimítása az 1. lépésben meghatározott függvénnyel.

A függvény használata a beágyazott séma simításához

Végül a függvénnyel egybesimíthatja az adatkeret df_flat_explodebeágyazott sémáját egy új adatkeretbe: df_flat_explode_flat

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

A megjelenítési függvénynek 13 oszlopot és 2 sort kell megjelenítenie.

Az adatkeret df_flat_explode_flat függvénye printSchema a következő eredményt adja vissza:

A végső sémát megjelenítő kód

Tömbök és beágyazott struktúrák közvetlen olvasása

Az SQL kiszolgáló nélküli modelljével lekérdezhet és létrehozhat nézeteket és táblákat az ilyen objektumokon.

Először is az adatok tárolási módjától függően a felhasználóknak az alábbi osztályozást kell használniuk. A nagybetűkben megjelenő összes elem a használati esetre jellemző:

Tömeges Formátum
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' "Parquet" (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' "CosmosDB" (Azure Synapse Link)

Cserélje le az egyes mezőket az alábbiak szerint:

  • A "FENTI TÖMEGES" az adatforrás kapcsolati sztring, amelyhez csatlakozik.
  • A "FENTI TÍPUS" a forráshoz való csatlakozáshoz használt formátum.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

A műveleteknek két különböző típusa van:

  • Az első művelettípust a következő kódsor jelzi, amely meghatározza a nevű oszlopot contextdataeventTime , amely a beágyazott elemre hivatkozik: Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Ez a sor határozza meg a nevű contextdataeventTime oszlopot, amely a beágyazott elemre hivatkozik: Context>Data>eventTime.

  • A második művelettípus új cross apply sorokat hoz létre a tömb minden eleméhez. Ezután meghatározza az egyes beágyazott objektumokat.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Ha a tömb 5 elemet tartalmaz 4 beágyazott struktúrával, az SQL kiszolgáló nélküli modellje 5 sort és 4 oszlopot ad vissza. Az SQL kiszolgáló nélküli modellje lekérdezhető, 2 sorban leképezheti a tömböt, és oszlopokba rendezheti az összes beágyazott struktúrát.

Következő lépések