Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Inferir y evolucionar el esquema usando
Importante
Esta característica está en la versión preliminar pública.
En este artículo se describe cómo deducir y evolucionar el esquema de blobs JSON con la from_json
función SQL en Canalizaciones declarativas de Lakeflow.
Información general
La from_json
función SQL analiza una columna de cadena JSON y devuelve un valor de estructura. Cuando se usa fuera de las canalizaciones declarativas de Lakeflow, debe proporcionar explícitamente el esquema del valor devuelto mediante el argumento schema
. Cuando se utilizan con las canalizaciones declarativas de Lakeflow, se puede habilitar la inferencia y evolución del esquema, que administra automáticamente el esquema del valor devuelto. Esta característica simplifica la configuración inicial (especialmente cuando se desconoce el esquema) y las operaciones en curso cuando el esquema cambia con frecuencia. Permite el procesamiento sin problemas de blobs JSON arbitrarios de orígenes de datos de streaming, como Auto Loader, Kafka o Kinesis.
En concreto, cuando se usa la función from_json
SQL en canalizaciones declarativas de Lakeflow, la inferencia de esquemas y la evolución pueden:
- Detección de nuevos campos en registros JSON entrantes (incluidos los objetos JSON anidados)
- Inferir los tipos de campo y asignarlos a los tipos de datos de Spark adecuados
- Evolucione automáticamente el esquema para dar cabida a nuevos campos
- Controlar automáticamente los datos que no se ajustan al esquema actual
Sintaxis: inferir y evolucionar automáticamente el esquema
Si usa from_json
con canalizaciones declarativas de Lakeflow, puede deducir y evolucionar automáticamente el esquema. Para habilitarlo, establezca el esquema en NULL y especifique la schemaLocationKey
opción. Esto le permite deducir y realizar un seguimiento del esquema.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Pitón
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Una consulta puede tener varias from_json
expresiones, pero cada expresión debe tener un único schemaLocationKey
. El schemaLocationKey
también debe ser único por canalización.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pitón
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Sintaxis: esquema fijo
Si quiere aplicar un esquema determinado en su lugar, puede usar la siguiente from_json
sintaxis para analizar la cadena JSON mediante ese esquema:
from_json(jsonStr, schema, [, options])
Esta sintaxis se puede usar en cualquier entorno de Azure Databricks, incluidas las canalizaciones declarativas de Lakeflow. Puede encontrar más información disponible aquí.
Inferencia de esquemas
from_json
deduce el esquema del primer lote de columnas de datos JSON y lo indexa internamente por su schemaLocationKey
(obligatorio).
Si la cadena JSON es un único objeto (por ejemplo, {"id": 123, "name": "John"}
), from_json
deduce un esquema de tipo STRUCT y agrega un rescuedDataColumn
elemento a la lista de campos.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Sin embargo, si la cadena JSON tiene una matriz de nivel superior (como ["id": 123, "name": "John"]
), from_json
ajusta la matriz en un STRUCT. Este enfoque permite la recución de datos que no son compatibles con el esquema inferido. Tiene la opción de explotar los valores de la matriz en filas independientes de bajada.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Invalidación de la inferencia de esquema mediante sugerencias de esquema
Opcionalmente, puede proporcionar schemaHints
para influir en cómo from_json
deduce el tipo de una columna. Esto resulta útil cuando sabe que una columna es de un tipo de datos específico o si desea elegir un tipo de datos más general (por ejemplo, un doble en lugar de un entero). Puede proporcionar un número arbitrario de sugerencias para los tipos de datos de columna mediante la sintaxis de especificación del esquema SQL. La semántica de las sugerencias de esquema es la misma que para las sugerencias de esquema del cargador automático. Por ejemplo:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Cuando la cadena JSON contiene una MATRIZ de nivel superior, se encapsula en un STRUCT. En estos casos, las sugerencias de esquema se aplican al esquema ARRAY en lugar del STRUCT ajustado. Por ejemplo, considere una cadena JSON con una matriz de nivel superior, como:
[{"id": 123, "name": "John"}]
El esquema ARRAY inferido se ajusta en un STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Para cambiar el tipo de datos de id
, especifique la sugerencia de esquema como element.id
STRING. Para agregar una nueva columna de tipo DOUBLE, especifique element.new_col
DOUBLE. Debido a estas sugerencias, el esquema de la matriz JSON de nivel superior se convierte en:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolución del esquema mediante schemaEvolutionMode
from_json
detecta la adición de nuevas columnas a medida que procesa los datos. Cuando from_json
detecta un nuevo campo, actualiza el esquema inferido con el esquema más reciente mediante la combinación de nuevas columnas al final del esquema. Los tipos de datos de las columnas existentes permanecen sin cambios. Después de la actualización del esquema, la canalización se reinicia automáticamente con el esquema actualizado.
from_json
admite los siguientes modos para la evolución del esquema, que se establecen en el valor schemaEvolutionMode
opcional. Estos modos son coherentes con el cargador automático.
schemaEvolutionMode |
Comportamiento al leer una nueva columna |
---|---|
addNewColumns (valor predeterminado) |
Se produce un error en la secuencia. Se agregan nuevas columnas al esquema. Las columnas existentes no evolucionan tipos de datos. |
rescue |
El esquema nunca evoluciona y la secuencia no genera errores debido a cambios de esquema. Todas las columnas nuevas se registran en la columna de datos rescatados. |
failOnNewColumns |
Se produce un error en la secuencia. La transmisión no se reinicia a menos que schemaHints se actualice o se eliminen los datos problemáticos. |
none |
No evoluciona el esquema, las nuevas columnas se omiten y los datos no se rescaten a menos que se rescuedDataColumn establezca la opción. No se produce un error en la secuencia debido a cambios de esquema. |
Por ejemplo:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Columna de datos rescatados
Una columna de datos rescate se agrega automáticamente al esquema como _rescued_data
. Puede cambiar el nombre de la columna estableciendo la rescuedDataColumn
opción. Por ejemplo:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Cuando decide usar la columna de datos rescate, las columnas que no coincidan con el esquema inferido se rescatarán en lugar de quitarse. Esto puede ocurrir debido a una falta de coincidencia de tipo de datos, una columna que falta en el esquema o una diferencia de mayúsculas y minúsculas de nombre de columna.
Control de registros dañados
Para almacenar registros con formato incorrecto y no se pueden analizar, agregue una _corrupt_record
columna estableciendo sugerencias de esquema, como en el ejemplo siguiente:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Para cambiar el nombre de la columna de registro dañado, establezca la columnNameOfCorruptRecord
opción.
El analizador JSON admite tres modos para controlar registros dañados:
Modo | Descripción |
---|---|
PERMISSIVE |
En el caso de los registros dañados, coloca la cadena con formato incorrecto en un campo configurado por columnNameOfCorruptRecord y establece campos con formato incorrecto en null . Para mantener registros dañados, puede establecer un campo de tipo de cadena denominado columnNameOfCorruptRecord en un esquema definido por el usuario. Si un esquema no tiene el campo, los registros dañados se quitan durante el análisis. Al inferir un esquema, el analizador sintáctico añade columnNameOfCorruptRecord implícitamente un campo en el esquema de salida. |
DROPMALFORMED |
Omite los registros dañados. Cuando se usa DROPMALFORMED el modo con rescuedDataColumn , los errores de coincidencia de tipos de datos no hacen que se quiten los registros. Solo se quitan los registros dañados, como JSON incompleto o con formato incorrecto. |
FAILFAST |
Produce una excepción cuando el analizador cumple los registros dañados. Cuando se usa FAILFAST el modo con rescuedDataColumn , los errores de coincidencia de tipos de datos no producen un error. Solo los registros dañados producen errores, como JSON incompleto o con formato incorrecto. |
Hacer referencia a un campo en la salida de from_json
from_json
deduce el esquema durante la ejecución de la canalización. Si una consulta de bajada hace referencia a un from_json
campo antes de que la from_json
función se haya ejecutado correctamente al menos una vez, el campo no se resuelve y se omite la consulta. En el ejemplo siguiente, se omitirá el análisis de la consulta de tabla silver hasta que la from_json
función de la consulta bronze se haya ejecutado e inferido el esquema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Si la funciónfrom_json
y los campos a los que se hace referencia se hacen referencia en la misma consulta, el análisis podría producir un error como en el ejemplo siguiente:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Para corregirlo, mueva la referencia al from_json
campo a una consulta de bajada (como el ejemplo bronce/plata anterior). Como alternativa, puede especificar schemaHints
que contenga los campos a los que se hace referencia from_json
. Por ejemplo:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Ejemplos: inferir y evolucionar automáticamente el esquema
En esta sección se proporciona código de ejemplo para habilitar la inferencia automática de esquemas y la evolución utilizando from_json
en las canalizaciones declarativas de Lakeflow.
Creación de una tabla de streaming desde el almacenamiento de objetos en la nube
En el ejemplo siguiente se usa read_files
la sintaxis para crear una tabla de streaming desde el almacenamiento de objetos en la nube.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pitón
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Creación de una tabla de streaming desde Kafka
En el ejemplo siguiente se usa read_kafka
la sintaxis para crear una tabla de streaming desde Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Pitón
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Ejemplos: esquema fijo
Para obtener código de ejemplo que usa from_json
con un esquema fijo, consulte from_json
función.
Preguntas más frecuentes
En esta sección se responden las preguntas más frecuentes sobre la inferencia de esquemas y la compatibilidad con la evolución en la from_json
función.
¿Cuál es la diferencia entre from_json
y parse_json
?
La parse_json
función devuelve un VARIANT
valor de la cadena JSON.
VARIANT proporciona una manera flexible y eficaz de almacenar datos semiestructurados. Esto evita la inferencia de esquemas y la evolución al evitar por completo los tipos estrictos. Sin embargo, si desea aplicar un esquema en tiempo de escritura (por ejemplo, porque tiene un esquema relativamente estricto), from_json
podría ser una opción mejor.
En la tabla siguiente se describen las diferencias entre from_json
y parse_json
:
Función | Casos de uso | Disponibilidad |
---|---|---|
from_json |
La evolución del esquema con from_json mantiene el esquema. Esto es útil cuando:
|
Disponible solo con la inferencia de esquemas y evolución en las canalizaciones declarativas de Lakeflow |
parse_json |
VARIANT es especialmente adecuado para contener datos que no es necesario esquematizar. Por ejemplo:
|
Disponible con o sin canalizaciones declarativas de Lakeflow |
¿Puedo usar la inferencia de esquemas y la sintaxis de evolución fuera de las canalizaciones declarativas de Lakeflow?
No, no puedes usar from_json
la inferencia de esquema y la sintaxis de evolución fuera de las canalizaciones declarativas de Lakeflow.
¿Cómo se accede al esquema inferido por from_json
?
Vea el esquema de la tabla de streaming de destino.
¿Puedo pasar from_json
un esquema y también hacer la evolución?
No, no se puede pasar from_json
un esquema y también realizar la evolución. Sin embargo, puede proporcionar sugerencias de esquema para invalidar algunos o todos los campos inferidos por from_json
.
¿Qué ocurre con el esquema si la tabla se actualiza completamente?
Las ubicaciones de esquema asociadas a la tabla se borran y el esquema se vuelve a deducir desde cero.