Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Inférer et faire évoluer le schéma à l’aide de
Important
Cette fonctionnalité est en version préliminaire publique.
Cet article explique comment déduire et faire évoluer le schéma des objets blob JSON avec la from_json fonction SQL dans les pipelines déclaratifs Spark Lakeflow.
Aperçu
La from_json fonction SQL analyse une colonne de chaîne JSON et retourne une valeur de struct. Lorsqu’il est utilisé en dehors d’un pipeline, vous devez fournir explicitement le schéma de la valeur retournée à l’aide de l’argument schema . Lorsqu’il est utilisé avec les pipelines déclaratifs Spark Lakeflow, vous pouvez activer l’inférence et l’évolution du schéma, qui gère automatiquement le schéma de la valeur retournée. Cette fonctionnalité simplifie à la fois la configuration initiale (en particulier lorsque le schéma est inconnu) et les opérations en cours lorsque le schéma change fréquemment. Il permet de traiter en toute transparence des objets blob JSON arbitraires à partir de sources de données de streaming telles que Auto Loader, Kafka ou Kinesis.
Plus précisément, lorsqu’elle est utilisée dans un pipeline, l’inférence de schéma et l’évolution pour la fonction SQL from_json peuvent :
- Détecter de nouveaux champs dans les enregistrements JSON entrants (y compris les objets JSON imbriqués)
- Déduire les types de champs et les mapper aux types de données Spark appropriés
- Faire évoluer automatiquement le schéma pour prendre en charge les nouveaux champs
- Gérer automatiquement les données qui ne sont pas conformes au schéma actuel
Syntaxe : déduire et faire évoluer automatiquement le schéma
Pour activer l’inférence de schéma avec from_json dans un pipeline, définissez le schéma sur NULL et spécifiez l’option schemaLocationKey. Cela lui permet d’inférer et de suivre le schéma.
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
Python
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
Une requête peut avoir plusieurs from_json expressions, mais chaque expression doit avoir une expression unique schemaLocationKey. Il schemaLocationKey doit également être unique pour chaque pipeline.
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Syntaxe : schéma fixe
Si vous souhaitez appliquer un schéma particulier à la place, vous pouvez utiliser la syntaxe suivante from_json pour analyser la chaîne JSON à l’aide de ce schéma :
from_json(jsonStr, schema, [, options])
Cette syntaxe peut être utilisée dans n’importe quel environnement Azure Databricks, y compris les pipelines déclaratifs Spark Lakeflow. Plus d’informations sont disponibles ici.
Inférence de schéma
from_json déduit le schéma du premier lot de colonnes de données JSON et l’indexe en interne par son schemaLocationKey (obligatoire).
Si la chaîne JSON est un objet unique (par exemple, {"id": 123, "name": "John"}), from_json déduit un schéma de type STRUCT et ajoute une rescuedDataColumn à la liste des champs.
STRUCT<id LONG, name STRING, _rescued_data STRING>
Toutefois, si la chaîne JSON a un tableau de niveau supérieur (par exemple ["id": 123, "name": "John"]), from_json encapsule le TABLEAU dans un STRUCT. Cette approche permet de sauver des données incompatibles avec le schéma déduit. Vous avez la possibilité de décomposer les valeurs du tableau en lignes distinctes plus bas.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Remplacer l’inférence de schéma à l’aide d’indicateurs de schéma
Vous pouvez éventuellement fournir schemaHints pour influencer la manière dont from_json déduit le type d'une colonne. Cela est utile lorsque vous savez qu’une colonne est d’un type de données spécifique ou si vous souhaitez choisir un type de données plus général (par exemple, un double au lieu d’un entier). Vous pouvez fournir un nombre arbitraire d’indicateurs pour les types de données de colonne à l’aide de la syntaxe de spécification de schéma SQL. La sémantique des indicateurs de schéma est la même que pour les indicateurs de schéma du chargeur automatique. Par exemple:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
Lorsque la chaîne JSON contient un TABLEAU de niveau supérieur, elle est encapsulée dans une STRUCTURE. Dans ces cas, les indicateurs de schéma sont appliqués au schéma ARRAY au lieu du STRUCT encapsulé. Par exemple, considérez une chaîne JSON avec un tableau de niveau supérieur, par exemple :
[{"id": 123, "name": "John"}]
Le schéma ARRAY déduit est encapsulé dans un STRUCT :
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Pour modifier le type de données de id, spécifiez l’indice de schéma sous la forme element.id STRING. Pour ajouter une nouvelle colonne de type DOUBLE, spécifiez element.new_col DOUBLE. En raison de ces indicateurs, le schéma du tableau JSON de niveau supérieur devient :
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Faire évoluer le schéma à l’aide de schemaEvolutionMode
from_json détecte l’ajout de nouvelles colonnes au fur et à mesure qu’elle traite vos données. Lorsqu’un nouveau champ est détecté par from_json, le schéma déduit est mis à jour avec le schéma le plus récent en ajoutant les nouvelles colonnes à la fin du schéma. Les types de données des colonnes existantes restent inchangés. Après la mise à jour du schéma, le pipeline redémarre automatiquement avec le schéma mis à jour.
from_json prend en charge les modes suivants pour l’évolution du schéma, que vous définissez à l’aide du paramètre facultatif schemaEvolutionMode . Ces modes sont cohérents avec le chargeur automatique.
schemaEvolutionMode |
Comportement lors de la lecture d’une nouvelle colonne |
|---|---|
addNewColumns (valeur par défaut) |
Échec du flux. De nouvelles colonnes sont ajoutées au schéma. Les colonnes existantes n’évoluent pas dans les types de données. |
rescue |
Le schéma n’est jamais évolué et le flux n’échoue pas en raison des modifications de schéma. Toutes les nouvelles colonnes sont enregistrées dans la colonne de données sauvée. |
failOnNewColumns |
Échec du flux. Stream ne redémarre pas, sauf si les schemaHints données sont mises à jour ou si les données incriminées sont supprimées. |
none |
N’évolue pas le schéma, les nouvelles colonnes sont ignorées et les données ne sont pas sauvées, sauf si l’option rescuedDataColumn est définie. Le flux n’échoue pas en raison des modifications de schéma. |
Par exemple:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Colonne de données sauvées
Une colonne de données sauvée est automatiquement ajoutée à votre schéma en tant que _rescued_data. Vous pouvez renommer la colonne en définissant l’option rescuedDataColumn . Par exemple:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
Lorsque vous choisissez d’utiliser la colonne de données sauvée, toutes les colonnes qui ne correspondent pas au schéma déduit sont sauvées au lieu d’être supprimées. Cela peut se produire en raison d’une incompatibilité de type de données, d’une colonne manquante dans le schéma ou d’une différence de casse de nom de colonne.
Gérer les enregistrements endommagés
Pour stocker les enregistrements mal formés et ne peuvent pas être analysés, ajoutez une _corrupt_record colonne en définissant des indicateurs de schéma, comme dans l’exemple suivant :
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Pour renommer la colonne d’enregistrement endommagée, définissez l’option columnNameOfCorruptRecord .
L’analyseur JSON prend en charge trois modes de gestion des enregistrements endommagés :
| Mode | Descriptif |
|---|---|
PERMISSIVE |
Pour les enregistrements endommagés, place la chaîne mal formée dans un champ configuré par columnNameOfCorruptRecord et définit les champs mal formés sur null. Pour conserver les enregistrements endommagés, vous pouvez définir un champ de type chaîne nommé columnNameOfCorruptRecord dans un schéma défini par l’utilisateur. Si un schéma n’a pas le champ, les enregistrements endommagés sont supprimés pendant l’analyse. Lors de l’inférence d’un schéma, l’analyseur ajoute implicitement un columnNameOfCorruptRecord champ dans le schéma de sortie. |
DROPMALFORMED |
Ignore les enregistrements endommagés. Lorsque vous utilisez DROPMALFORMED le mode avec rescuedDataColumn, les incompatibilités de type de données n’entraînent pas la suppression des enregistrements. Seuls les enregistrements endommagés sont supprimés, tels que json incomplet ou mal formé. |
FAILFAST |
Lève une exception lorsque l’analyseur rencontre des enregistrements corrompus. Lorsque vous utilisez le mode FAILFAST avec rescuedDataColumn, les incompatibilités de types de données ne génèrent pas d’erreur. Seuls les enregistrements endommagés génèrent des erreurs, par exemple un JSON incomplet ou mal formé. |
Reportez-vous à un champ dans la sortie de from_json
from_json déduit le schéma pendant l’exécution du pipeline. Si une requête en aval fait référence à un from_json champ avant l’exécution réussie de la from_json fonction au moins une fois, le champ ne se résout pas et la requête est ignorée. Dans l’exemple suivant, l’analyse de la requête de table silver est ignorée jusqu’à ce que la from_json fonction dans la requête bronze ait exécuté et déduit le schéma.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
Si la from_json fonction et les champs auxquels elle déduit sont référencées dans la même requête, l’analyse peut échouer comme dans l’exemple suivant :
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Vous pouvez résoudre ce problème en déplaçant la référence du champ from_json dans une requête ultérieure (comme l’exemple bronze/argent ci-dessus.) Vous pouvez également spécifier les schemaHints qui contiennent les champs from_json référencés. Par exemple:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Exemples : déduire et faire évoluer automatiquement le schéma
Cette section fournit un exemple de code permettant d’activer l’inférence et l’évolution automatiques de schéma à l’aide de from_json dans les pipelines déclaratifs Spark de Lakeflow.
Créer une table de diffusion en continu à partir du stockage d’objets cloud
L’exemple suivant utilise read_files la syntaxe pour créer une table de streaming à partir du stockage d’objets cloud.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Créer une table de diffusion en continu à partir de Kafka
L’exemple suivant utilise la syntaxe read_kafka pour créer une table en streaming à partir de Kafka.
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Exemples : Schéma fixe
Pour obtenir un exemple de code utilisant from_json un schéma fixe, consultez from_json la fonction.
Questions fréquentes (FAQ)
Cette section répond aux questions fréquemment posées sur l’inférence de schéma et la prise en charge de l’évolution dans la from_json fonction.
Quelle est la différence entre from_json et parse_json?
La parse_json fonction retourne une VARIANT valeur de la chaîne JSON.
VARIANT offre un moyen flexible et efficace de stocker des données semi-structurées. Cela contourne l’inférence de schéma et l’évolution en supprimant complètement les types stricts. Toutefois, si vous souhaitez appliquer un schéma au moment de l’écriture (par exemple, parce que vous disposez d’un schéma relativement strict), from_json cela peut être une meilleure option.
Le tableau suivant décrit les différences entre from_json et parse_json:
| Fonction | Cas d’utilisation | Availability |
|---|---|---|
from_json |
Évolution du schéma avec from_json et maintien du schéma. Cela est utile lorsque :
|
Disponible uniquement avec l’inférence de schéma et l’évolution dans les pipelines déclaratifs Spark Lakeflow |
parse_json |
VARIANT est particulièrement adapté à la conservation des données qui n’ont pas besoin d’être schématisées. Par exemple:
|
Disponible avec et sans pipelines déclaratifs Spark Lakeflow |
Puis-je utiliser la from_json syntaxe d’inférence et d’évolution de schéma en dehors des pipelines déclaratifs Spark Lakeflow ?
Non, vous ne pouvez pas utiliser la syntaxe d'inférence et d'évolution de schéma from_json en dehors des pipelines déclaratifs Spark Lakeflow.
Comment accéder au schéma déduit par from_json?
Affichez le schéma de la table de diffusion en continu cible.
Puis-je passer from_json un schéma et faire aussi l’évolution ?
Non, vous ne pouvez pas passer from_json un schéma et faire également l’évolution. Toutefois, vous pouvez fournir des indicateurs de schéma pour remplacer certains ou tous les champs déduits par from_json.
Que se passe-t-il pour le schéma si la table est entièrement actualisée ?
Les emplacements de schéma associés à la table sont vidés et le schéma est réanalysé à partir de zéro.