Поделиться через


Анализ сложных типов данных в Azure Synapse Analytics

Сведения этой статьи относятся к файлам и контейнерам Parquet в Azure Synapse Link для Azure Cosmos DB. Spark или SQL можно использовать для чтения или преобразования данных со сложными схемами, такими как массивы или вложенные структуры. Результатом следующего примера является один документ, который можно легко масштабировать в миллиарды документов с помощью Spark или SQL. Код, прилагаемый к этой статье, использует PySpark (Python).

Вариант использования

Сложные типы данных встречаются все чаще и представляют трудности для специалистов по обработке данных. При анализе вложенной схемы и массивов обычно используются длительно выполняющиеся и сложные SQL-запросы. Кроме того, переименовать или привести тип данных вложенных столбцов может оказаться очень сложно. При работе с глубоко вложенными объектами также могут возникнуть проблемы с производительностью.

Специалистам по обработке данных необходимо понять, как организовать эффективную обработку сложных типов данных и сделать их доступными для всех. В следующем примере для чтения и преобразования объектов в плоскую структуру через кадры данных используется Spark в Azure Synapse Analytics. Используйте бессерверную модель SQL в Azure Synapse Analytics, чтобы напрямую запрашивать такие объекты и возвращать эти результаты в виде обычной таблицы.

Что собой представляют массивы и вложенные структуры?

Следующий объект взят из Application Insights. В этом объекте есть вложенные структуры и массивы, содержащие вложенные структуры.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Пример схемы массивов и вложенных структур

При печати схемы кадра данных объекта (называется df) с помощью команды df.printschema вы увидите следующее представление:

  • Желтый цвет обозначает вложенные структуры.
  • Зеленый цвет обозначает массив с двумя элементами.

Код с выделениями желтого и зеленого цвета, показывающий источник схемы

_rid, _ts и _etag добавлены в систему во время приема документа в хранилище транзакций Azure Cosmos DB.

Приведенный кадр данных подсчитывает только 5 столбцов и одну строку. После преобразования кадр курированных данных будет содержать 13 столбцов и 2 строки в табличном формате.

Преобразование в плоскую структуру вложенных структур и развертывание массивов

С помощью Spark в Azure Synapse Analytics можно с легкостью преобразовывать вложенные структуры в столбцы, а элементы массива — в несколько строк. Для реализации выполните следующие шаги.

Блок-схема с шагами для преобразований с помощью Spark

Определение функции для преобразования вложенной схемы в плоскую структуру

Эту функцию можно использовать без изменения. Создайте ячейку в записной книжке PySpark, используя следующую функцию:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Использование функции для преобразования вложенной схемы в плоскую структуру

На этом шаге выполняется преобразование в плоскую структуру вложенной схемы кадра данных (df) в новый кадр данных (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Функция отображения должна вернуть 10 столбцов и 1 строку. Массив и его вложенные элементы все еще существуют.

Преобразование массива

В этом случае вы преобразуете массив context_custom_dimensions в кадре данных df_flat в новый кадр данных df_flat_explode. В следующем коде вы также определите, какой столбец следует выбрать:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Функция отображения должна вернуть 10 столбцов и 2 строки. Следующим шагом будет преобразование в плоскую структуру вложенных схем с помощью функции, определенной на шаге 1.

Использование функции для преобразования вложенной схемы в плоскую структуру

Наконец, используйте функцию, чтобы преобразовать в плоскую структуру вложенную схему кадра данных df_flat_explode в новый кадр данных df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Функция отображения должна показать 13 столбцов и 2 строки.

Функция printSchema кадра данных df_flat_explode_flat возвращает следующий результат:

Код, демонстрирующий окончательную схему

Чтение массивов и вложенных структур напрямую

С помощью бессерверной модели SQL можно запрашивать и создавать представления и таблицы на таких объектах.

Во первых, в зависимости от способа хранения данных пользователи должны использовать следующую таксономию. Все, что указано прописными буквами, относится к вашему варианту использования:

Массовость Формат
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' Parquet (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' CosmosDB (Azure Synapse Link)

Замените каждое поле следующим образом:

  • YOUR BULK ABOVE — строка подключения к источнику данных, к которому вы подключаетесь.
  • YOUR TYPE ABOVE — формат, используемый для подключения к источнику.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Существует два разных типа операций:

  • Первый тип операции указывается в следующей строке кода, определяющей столбец с именем contextdataeventTime, который ссылается на вложенный элемент Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Эта строка определяет столбец с именем contextdataeventTime, который ссылается на вложенный элемент Context>Data>eventTime.

  • Второй тип операции использует cross apply, чтобы создать строки для каждого элемента в массиве. Затем он определяет каждый вложенный объект.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Если массив содержит 5 элементов с 4 вложенными структурами, то бессерверная модель SQL возвращает 5 строк и 4 столбца. Бессерверная модель SQL может выполнять запросы на месте, сопоставлять массив в двух строках и отображать все вложенные структуры в столбцах.

Дальнейшие действия