分析 Azure Synapse Analytics 中的複雜資料類型

本文與 Azure Cosmos DB 的 Azure Synapse Link 中的 Parquet 檔案和容器相關。 您可以使用 Spark 或 SQL 搭配複雜的結構描述 (例如陣列或巢狀結構) 來讀取或轉換資料。 下列範例是使用單一文件所完成,但可以使用 Spark 或 SQL 輕鬆地調整為數十億份文件。 本文中所包含的程式碼會使用 PySpark (Python)。

使用案例

複雜的資料類型日益普及,並代表資料工程師所面臨的挑戰。 分析巢狀結構描述和陣列可能牽涉到曠日耗時且複雜的 SQL 查詢。 此外,重新命名或轉換巢狀資料行資料類型可能會很困難。 且當您使用深層的巢狀物件時,可能會遇到效能問題。

資料工程師必須了解如何有效率地處理複雜的資料類型,並讓每個人都能輕鬆存取這些資料類型。 在下列範例中,您會使用 Azure Synapse Analytics 中的 Spark,透過資料框架來讀取物件及將物件轉換成平面結構。 您可以使用 Azure Synapse Analytics 中的 SQL 無伺服器模型來直接查詢這類物件,並以一般資料表的形式傳回這些結果。

什麼是陣列和巢狀結構?

下列物件來自 Application Insights。 在此物件中,有巢狀結構和包含巢狀結構的陣列。

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

陣列和巢狀結構的結構描述範例

當您使用 df.printschema 命令列印物件資料框架 (稱為 df) 的結構描述時,您會看到下列標記法:

  • 黃色表示巢狀結構。
  • 綠色表示具有兩個元素的陣列。

Code with yellow and green highlighting, showing schema origin

在文件內嵌至 Azure Cosmos DB 交易式存放區時,_rid_ts_etag 已新增至系統。

先前的資料框架為 5 個資料行和只有 1 個資料行的計數。 在轉換之後,策劃資料框架會有 13 個資料行和 2 個資料列,採用表格式格式。

壓平合併巢狀結構和分裂陣列

使用 Azure Synapse Analytics 中的 Spark 時,很容易就能將巢狀結構轉換成資料行,並將陣列元素轉換成多個資料列。 使用下列步驟來實作。

Flowchart showing steps for Spark transformations

定義函式來壓平合併巢狀架構

您可以在不變更的情況下使用此函式。 在 PySpark 筆記本 中使用下列函式建立儲存格:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

使用函式來壓平合併巢狀架構

在此步驟中,您會將資料框架的巢狀架構 (df) 壓平合併成新的資料框架 (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

顯示函式應該會傳回 10 個資料行和 1 個資料列。 陣列與其巢狀元素仍然存在。

轉換陣列

在這裡,您會將 df_flat 資料框架中的陣列 context_custom_dimensions 轉換成新的資料框架 df_flat_explode。 在下列程式碼中,您也可以定義要選取的資料行:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

顯示函式應該會傳回 10 個資料行和 2 個資料列。 下一個步驟是使用步驟 1 中定義的函式,將巢狀結構描述壓平合併。

使用函式來壓平合併巢狀架構

最後,您會使用函式,將資料框架 df_flat_explode 的巢狀架構壓平合併至新的資料框架 df_flat_explode_flat 中:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

顯示函式應該會顯示 13 個資料行和 2 個資料列。

資料框架 df_flat_explode_flat 的函式 printSchema 會傳回下列結果:

Code showing the final schema

直接讀取陣列和巢狀結構

您可以使用 SQL 的無伺服器模型來查詢及建立此類物件的檢視和資料表。

首先,視資料的儲存方式而定,使用者應該使用下列分類法。 大寫顯示的所有項目都是特定於您的使用案例:

大量 格式
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' 'Parquet' (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' 'CosmosDB' (Azure Synapse Link)

取代每個欄位,如下所示:

  • 'YOUR BULK ABOVE' 是您所連線資料來源的連接字串。
  • 'YOUR TYPE ABOVE' 是您用來連線到來源的格式。
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

作業有兩種不同類型:

  • 會在下列程式碼中指出第一個作業類型,其會定義名為 contextdataeventTime 的資料行,該資料行會參考巢狀元素 Context.Data.eventTime

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    這一行會定義名為 contextdataeventTime 的資料行,該資料行會參考巢狀元素 Context>Data>eventTime

  • 第二個作業類型會使用 cross apply 來為陣列下的每個元素建立新的資料列。 然後,其會定義每個巢狀物件。

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    如果陣列有 5 個具有 4 個巢狀結構的元素,SQL 的無伺服器模型會傳回 5 個資料列和 4 個資料行。 SQL 的無伺服器模型可以就地查詢、對應 2 個資料列中的陣列,並將所有的巢狀結構都顯示在資料行中。

下一步