Sdílet prostřednictvím


Analýza složitých datových typů v Azure Synapse Analytics

Tento článek se týká souborů a kontejnerů Parquet ve službě Azure Synapse Link pro Azure Cosmos DB. Pomocí Sparku nebo SQL můžete číst nebo transformovat data pomocí složitých schémat, jako jsou pole nebo vnořené struktury. Následující příklad je dokončený s jedním dokumentem, ale pomocí Sparku nebo SQL se dá snadno škálovat na miliardy dokumentů. Kód zahrnutý v tomto článku používá PySpark (Python).

Případ použití

Složité datové typy jsou stále častější a představují výzvu pro datové inženýry. Analýza vnořených schémat a polí může zahrnovat časově náročné a složité dotazy SQL. Kromě toho může být obtížné přejmenovat nebo přetypovat datový typ vnořených sloupců. Při práci s hluboce vnořenými objekty můžete také narazit na problémy s výkonem.

Datoví inženýři potřebují pochopit, jak efektivně zpracovávat složité datové typy a zajistit, aby byly snadno přístupné všem uživatelům. V následujícím příkladu použijete Spark v Azure Synapse Analytics ke čtení a transformaci objektů do ploché struktury prostřednictvím datových rámců. Bezserverový model SQL v Azure Synapse Analytics můžete použít k přímému dotazování takových objektů a vrácení těchto výsledků jako běžné tabulky.

Co jsou pole a vnořené struktury?

Následující objekt pochází z Application Insights. V tomto objektu jsou vnořené struktury a pole, které obsahují vnořené struktury.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Příklad schématu polí a vnořených struktur

Při tisku schématu datového rámce objektu (označovaného jako df) pomocí příkazu df.printschemase zobrazí následující znázornění:

  • Žlutá představuje vnořené struktury.
  • Zelená představuje pole se dvěma prvky.

Kód se žlutým a zeleným zvýrazněním, který zobrazuje původ schématu

_rid, _tsa _etag byly přidány do systému při ingestování dokumentu do transakčního úložiště služby Azure Cosmos DB.

Předchozí datový rámec se počítá pouze pro 5 sloupců a 1 řádek. Po transformaci bude mít kurátorovaný datový rámec 13 sloupců a 2 řádky v tabulkovém formátu.

Zploštění vnořených struktur a rozložení polí

Díky Sparku v Azure Synapse Analytics je snadné transformovat vnořené struktury na sloupce a prvky pole do více řádků. K implementaci použijte následující kroky.

Vývojový diagram znázorňující kroky pro transformace Sparku

Definování funkce pro zploštění vnořeného schématu

Tuto funkci můžete použít beze změny. Vytvořte buňku v poznámkovém bloku PySpark pomocí následující funkce:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Použití funkce ke zploštění vnořeného schématu

V tomto kroku zploštěte vnořené schéma datového rámce (df) do nového datového rámce (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Funkce zobrazení by měla vrátit 10 sloupců a 1 řádek. Pole a jeho vnořené prvky jsou stále k dispozici.

Transformace pole

Tady transformujete pole context_custom_dimensionsv datovém rámci df_flatna nový datový rámec df_flat_explode. V následujícím kódu také definujete, který sloupec se má vybrat:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Funkce zobrazení by měla vrátit 10 sloupců a 2 řádky. Dalším krokem je zploštět vnořená schémata s funkcí definovanou v kroku 1.

Použití funkce ke zploštění vnořeného schématu

Nakonec pomocí funkce zploštěte vnořené schéma datového rámce df_flat_explodedo nového datového rámce df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Funkce zobrazení by měla zobrazit 13 sloupců a 2 řádky.

Funkce printSchema datového rámce df_flat_explode_flat vrátí následující výsledek:

Kód znázorňující konečné schéma

Přímé čtení polí a vnořených struktur

Pomocí bezserverového modelu SQL můžete na tyto objekty dotazovat a vytvářet zobrazení a tabulky.

Za prvé, v závislosti na tom, jak jsou data uložena, by uživatelé měli použít následující taxonomii. Všechno, co se zobrazuje velkými písmeny, je specifické pro vaše případy použití:

Hromadné ingestování Formát
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' "Parquet" (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' CosmosDB (odkaz na Azure Synapse)

Jednotlivá pole nahraďte následujícím způsobem:

  • "VAŠE HROMADNÉ NAD" je připojovací řetězec zdroje dat, ke kterému se připojujete.
  • "VÁŠ TYP NAD" je formát, který používáte pro připojení ke zdroji.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Existují dva různé typy operací:

  • První typ operace je uveden v následujícím řádku kódu, který definuje sloupec s názvem contextdataeventTime , který odkazuje na vnořený element . Context.Data.eventTime

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Tento řádek definuje sloupec s názvem contextdataeventTime , který odkazuje na vnořený prvek Context>Data>eventTime.

  • Druhý typ operace používá cross apply k vytvoření nových řádků pro každý prvek v poli. Pak definuje každý vnořený objekt.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Pokud pole obsahovalo 5 prvků se 4 vnořenými strukturami, vrátí bezserverový model SQL 5 řádků a 4 sloupce. Bezserverový model SQL může dotazovat na místo, mapovat pole ve 2 řádcích a zobrazit všechny vnořené struktury do sloupců.

Další kroky