تحليل أنواع البيانات المعقدة في Azure Synapse Analytics

هذه المقالة ذات صلة بملفات وحاويات Parquet في Azure Synapse Link for Azure Cosmos DB. يمكنك استخدام Spark أو SQL لقراءة البيانات أو تحويلها باستخدام مخططات معقدة مثل الصفيفات أو الهياكل المتداخلة. يتم استكمال المثال التالي بمستند واحد، ولكن يمكن بسهولة توسيع نطاقه ليشمل مليارات المستندات باستخدام Spark أو SQL. التعليمة البرمجية المضمن في هذه المقالة يستخدم PySpark (Python).

حالة الاستخدام

أصبحت أنواع البيانات المعقدة شائعة بشكل متزايد وتمثل تحدياً لمهندسي البيانات. يمكن أن يتضمن تحليل الصفيفات والمخططات المتداخلة استعلامات SQL معقدة وتستغرق وقتاً طويلاً. بالإضافة إلى ذلك، قد يكون من الصعب إعادة تسمية نوع بيانات الأعمدة المتداخلة أو إرساله. أيضاً، عند العمل مع عناصر متداخلة بشدة، قد تواجه مشكلات في الأداء.

يحتاج مهندسو البيانات إلى فهم كيفية معالجة أنواع البيانات المعقدة بكفاءة وجعلها في متناول الجميع. في المثال التالي، يمكنك استخدام Spark في Azure Synapse Analytics لقراءة العناصر وتحويلها إلى بنية مسطحة من خلال إطارات البيانات. يمكنك استخدام نموذج SQL بلا خادم في Azure Synapse Analytics للاستعلام عن هذه العناصر مباشرةً، وإرجاع هذه النتائج كجدول عادي.

ما الصفيفات والبنى المتداخلة؟

يأتي العنصر التالي من Application Insights. في هذا العنصر، توجد هياكل وصفيفات متداخلة تحتوي على بنى متداخلة.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

مثال على مخطط الصفيفات والهياكل المتداخلة

عند طباعة مخطط إطار بيانات العنصر (المسمى df) باستخدام الأمر df.printschema، سترى التمثيل التالي:

  • يمثل اللون الأصفر الهياكل المتداخلة.
  • يمثل اللون الأخضر صفيف مكونة من عنصرين.

التعليمة البرمجية ذات التمييز باللونين الأصفر والأخضر، يُظهر أصل المخطط

تمت إضافة _rid و_ts و_etag إلى النظام حيث تم إدخال المستند في مخزن عمليات Azure Cosmos DB.

يتم حساب إطار البيانات السابق لـ 5 أعمدة وصف واحد فقط. بعد التحويل، سيحتوي إطار البيانات المنسق على 13 عموداً وصفين، بتنسيق جدولي.

قم بتسوية الهياكل المتداخلة وتفجير الصفيفات

باستخدام Spark في Azure Synapse Analytics، من السهل تحويل الهياكل المتداخلة إلى أعمدة وعناصر صفيف إلى صفوف متعددة. استخدم الخطوات التالية للتنفيذ.

مخطط انسيابي يعرض خطوات تحويلات شرارة

حدد وظيفة لتسوية المخطط المتداخل

يمكنك استخدام هذه الوظيفة دون تغيير. أنشئ خلية في PySpark notebook بالوظيفة التالية:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

استخدم الوظيفة لتسوية المخطط المتداخل

في هذه الخطوة، تقوم بتسوية المخطط المتداخل لإطار البيانات (df) في إطار بيانات جديد (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

يجب أن تقوم وظيفة العرض بإرجاع 10 أعمدة وصف واحد. الصفيف وعناصرها المتداخلة لا تزال موجودة.

تحويل الصفيف

هنا، تقوم بتحويل الصفيف، context_custom_dimensions، في إطار البيانات df_flat، إلى إطار بيانات جديد df_flat_explode. في التعليمة البرمجية التالي، يمكنك أيضاً تحديد العمود المراد تحديده:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

يجب أن تقوم وظيفة العرض بإرجاع 10 أعمدة وصفين. الخطوة التالية هي تسوية المخططات المتداخلة بالوظيفة المحددة في الخطوة 1.

استخدم الوظيفة لتسوية المخطط المتداخل

أخيراً، يمكنك استخدام الوظيفة لتسوية المخطط المتداخل لإطار البيانات df_flat_explode، في إطار بيانات جديد، df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

يجب أن تُظهر وظيفة العرض 13 عموداً وصفين.

تعرض الوظيفة printSchema لإطار البيانات df_flat_explode_flat النتيجة التالية:

تعليمة برمجية يظهر المخطط النهائي

اقرأ الصفيفات والبنى المتداخلة مباشرةً

باستخدام نموذج SQL بلا خادم، يمكنك الاستعلام وإنشاء طرق عرض وجداول حول هذه العناصر.

أولاً، بناءً على كيفية تخزين البيانات، يجب على المستخدمين استخدام التصنيف التالي. كل شيء يظهر بأحرف كبيرة خاص بحالة الاستخدام الخاصة بك:

حجم تنسيق
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' "Parquet" (ADLSg2)
N'endpoint =https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/؛ الحساب = ACCOUNTNAME؛ قاعدة البيانات = DATABASENAME؛ المجموعة = COLLECTIONNAME؛ المنطقة = REGIONTOQUERY '، SECRET =' YOURSECRET ' "CosmosDB" (رابط Azure Synapse)

استبدل كل حقل على النحو التالي:

  • "YOUR BULK ABOVE" هو سلسلة الاتصال لمصدر البيانات الذي تتصل به.
  • "YOUR TYPE ABOVE" هو التنسيق الذي تستخدمه للاتصال بالمصدر.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

هناك نوعان مختلفان من العمليات:

  • يُشار إلى نوع العملية الأول في السطر التالي من التعليمة البرمجية، والذي يحدد العمود المسمى contextdataeventTime والذي يشير إلى العنصر المتداخل، Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    يحدد هذا السطر العمود المسمى contextdataeventTime والذي يشير إلى العنصر المتداخل، Context>Data>eventTime.

  • يستخدم نوع العملية الثاني cross apply لإنشاء صفوف جديدة لكل عنصر تحت الصفيف. ثم يقوم بتعريف كل عنصر متداخل.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    إذا كان الصفيف يحتوي على 5 عناصر مع 4 بنى متداخلة، فإن نموذج SQL بلا خادم يُرجع 5 صفوف و4 أعمدة. يمكن لنموذج SQL بلا خادم الاستعلام في مكانه، وتعيين الصفيف في صفين، وعرض جميع الهياكل المتداخلة في أعمدة.

الخطوات التالية