Eventos
Únase a nosotros en FabCon Vegas
31 mar, 23 - 2 abr, 23
El último evento dirigido por la comunidad de Microsoft Fabric, Power BI, SQL y AI. 31 de marzo al 2 de abril de 2025.
Regístrate hoyEste explorador ya no se admite.
Actualice a Microsoft Edge para aprovechar las características y actualizaciones de seguridad más recientes, y disponer de soporte técnico.
Este artículo es adecuado para archivos y contenedores de Parquet en Azure Synapse Link para Azure Cosmos DB. Puede usar Spark o SQL para leer o transformar datos con esquemas complejos como matrices o estructuras anidadas. El ejemplo siguiente se realiza con un solo documento, pero se puede escalar con facilidad a miles de millones de documentos con Spark o SQL. En el código incluido en este artículo se usa PySpark (Python).
Los tipos de datos complejos son cada vez más comunes y representan un desafío para los ingenieros de datos. El análisis de matrices y esquemas anidados puede implicar consultas SQL complejas y lentas. Además, puede ser difícil cambiar el nombre o convertir el tipo de datos de las columnas anidadas. Por otra parte, si está trabajando con objetos profundamente anidados, puede encontrar problemas de rendimiento.
Los ingenieros de datos deben entender cómo procesar de forma eficaz los tipos de datos complejos y hacer que sean fácilmente accesibles para todos. En el ejemplo siguiente se usa Synapse Spark en Azure Synapse Analytics para leer y transformar objetos en una estructura plana por medio de tramas de datos. Se usa el modelo sin servidor de SQL en Azure Synapse Analytics para consultar estos objetos directamente y devolver los resultados como una tabla convencional.
El objeto siguiente procede de Application Insights. En este objeto hay estructuras anidadas y matrices que contienen estructuras anidadas.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Al imprimir el esquema de la trama de datos del objeto (denominado df) con el comando df.printschema
, se ve la siguiente representación:
_rid
, _ts
y _etag
se han agregado al sistema a medida que el documento se ingería en el almacén transaccional de Azure Cosmos DB.
En la trama de datos anterior solo hay 5 columnas y 1 fila. Después de la transformación, la trama de datos seleccionada tendrá 13 columnas y 2 filas en un formato tabular.
Con Spark en Azure Synapse Analytics, resulta sencillo transformar estructuras anidadas en columnas y elementos de matriz en varias filas. Use los pasos siguientes para la implementación.
Puede usar esta función sin cambiar. Cree una celda en un cuaderno de PySpark con la siguiente función:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
En este paso, se acopla el esquema anidado de la trama de datos (df) en una nueva trama de datos (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
La función display debe devolver 10 columnas y 1 fila. La matriz y sus elementos anidados todavía están allí.
Aquí se transforma la matriz, context_custom_dimensions
, de la trama de datos df_flat
, en una nueva trama de datos df_flat_explode
. En el código siguiente también se define la columna que se va a seleccionar:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
La función display debe devolver 10 columnas y 2 filas. El siguiente paso consiste en acoplar los esquemas anidados con la función definida en el paso 1.
Por último, se usa la función para acoplar el esquema anidado de la trama de datos df_flat_explode
en una nueva trama de datos df_flat_explode_flat
:
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
La función display debe mostrar 13 columnas y 2 filas.
La función printSchema
de la trama de datos df_flat_explode_flat
devuelve el siguiente resultado:
Con el modelo sin servidor de SQL, puede consultar y crear vistas y tablas a través de dichos objetos.
En primer lugar, en función de cómo se hayan almacenado los datos, los usuarios deben usar la siguiente taxonomía. Todo lo que aparece en mayúsculas es específico del caso de uso:
En bloque | Formato |
---|---|
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' | "Parquet" (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' | 'CosmosDB' (Azure Synapse Link) |
Reemplace cada campo de esta manera:
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Hay dos tipos distintos de operaciones:
El primer tipo de operación se indica en la siguiente línea de código, que define la columna denominada contextdataeventTime
que hace referencia al elemento anidado, Context.Data.eventTime
.
contextdataeventTime varchar(50) '$.context.data.eventTime'
En esta línea se define la columna denominada contextdataeventTime
que hace referencia al elemento anidado, Context>Data>eventTime
.
El segundo tipo de operación utiliza cross apply
para crear nuevas filas para cada elemento de la matriz. A continuación, define cada objeto anidado.
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Si la matriz tuviera 5 elementos con 4 estructuras anidadas, el modelo sin servidor de SQL devolvería 5 filas y 4 columnas. El modelo sin servidor de SQL sin servidor puede consultar en contexto, asignar la matriz en 2 filas y mostrar todas las estructuras anidadas en columnas.
Eventos
Únase a nosotros en FabCon Vegas
31 mar, 23 - 2 abr, 23
El último evento dirigido por la comunidad de Microsoft Fabric, Power BI, SQL y AI. 31 de marzo al 2 de abril de 2025.
Regístrate hoy