Partager via


Inférer et faire évoluer le schéma à l’aide de from_json dans les pipelines

Important

Cette fonctionnalité est en version préliminaire publique.

Cet article explique comment déduire et faire évoluer le schéma des objets blob JSON avec la from_json fonction SQL dans les pipelines déclaratifs Spark Lakeflow.

Aperçu

La from_json fonction SQL analyse une colonne de chaîne JSON et retourne une valeur de struct. Lorsqu’il est utilisé en dehors d’un pipeline, vous devez fournir explicitement le schéma de la valeur retournée à l’aide de l’argument schema . Lorsqu’il est utilisé avec les pipelines déclaratifs Spark Lakeflow, vous pouvez activer l’inférence et l’évolution du schéma, qui gère automatiquement le schéma de la valeur retournée. Cette fonctionnalité simplifie à la fois la configuration initiale (en particulier lorsque le schéma est inconnu) et les opérations en cours lorsque le schéma change fréquemment. Il permet de traiter en toute transparence des objets blob JSON arbitraires à partir de sources de données de streaming telles que Auto Loader, Kafka ou Kinesis.

Plus précisément, lorsqu’elle est utilisée dans un pipeline, l’inférence de schéma et l’évolution pour la fonction SQL from_json peuvent :

  • Détecter de nouveaux champs dans les enregistrements JSON entrants (y compris les objets JSON imbriqués)
  • Déduire les types de champs et les mapper aux types de données Spark appropriés
  • Faire évoluer automatiquement le schéma pour prendre en charge les nouveaux champs
  • Gérer automatiquement les données qui ne sont pas conformes au schéma actuel

Syntaxe : déduire et faire évoluer automatiquement le schéma

Pour activer l’inférence de schéma avec from_json dans un pipeline, définissez le schéma sur NULL et spécifiez l’option schemaLocationKey. Cela lui permet d’inférer et de suivre le schéma.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Python

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

Une requête peut avoir plusieurs from_json expressions, mais chaque expression doit avoir une expression unique schemaLocationKey. Il schemaLocationKey doit également être unique pour chaque pipeline.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Syntaxe : schéma fixe

Si vous souhaitez appliquer un schéma particulier à la place, vous pouvez utiliser la syntaxe suivante from_json pour analyser la chaîne JSON à l’aide de ce schéma :

from_json(jsonStr, schema, [, options])

Cette syntaxe peut être utilisée dans n’importe quel environnement Azure Databricks, y compris les pipelines déclaratifs Spark Lakeflow. Plus d’informations sont disponibles ici.

Inférence de schéma

from_json déduit le schéma du premier lot de colonnes de données JSON et l’indexe en interne par son schemaLocationKey (obligatoire).

Si la chaîne JSON est un objet unique (par exemple, {"id": 123, "name": "John"}), from_json déduit un schéma de type STRUCT et ajoute une rescuedDataColumn à la liste des champs.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Toutefois, si la chaîne JSON a un tableau de niveau supérieur (par exemple ["id": 123, "name": "John"]), from_json encapsule le TABLEAU dans un STRUCT. Cette approche permet de sauver des données incompatibles avec le schéma déduit. Vous avez la possibilité de décomposer les valeurs du tableau en lignes distinctes plus bas.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Remplacer l’inférence de schéma à l’aide d’indicateurs de schéma

Vous pouvez éventuellement fournir schemaHints pour influencer la manière dont from_json déduit le type d'une colonne. Cela est utile lorsque vous savez qu’une colonne est d’un type de données spécifique ou si vous souhaitez choisir un type de données plus général (par exemple, un double au lieu d’un entier). Vous pouvez fournir un nombre arbitraire d’indicateurs pour les types de données de colonne à l’aide de la syntaxe de spécification de schéma SQL. La sémantique des indicateurs de schéma est la même que pour les indicateurs de schéma du chargeur automatique. Par exemple:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Lorsque la chaîne JSON contient un TABLEAU de niveau supérieur, elle est encapsulée dans une STRUCTURE. Dans ces cas, les indicateurs de schéma sont appliqués au schéma ARRAY au lieu du STRUCT encapsulé. Par exemple, considérez une chaîne JSON avec un tableau de niveau supérieur, par exemple :

[{"id": 123, "name": "John"}]

Le schéma ARRAY déduit est encapsulé dans un STRUCT :

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Pour modifier le type de données de id, spécifiez l’indice de schéma sous la forme element.id STRING. Pour ajouter une nouvelle colonne de type DOUBLE, spécifiez element.new_col DOUBLE. En raison de ces indicateurs, le schéma du tableau JSON de niveau supérieur devient :

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Faire évoluer le schéma à l’aide de schemaEvolutionMode

from_json détecte l’ajout de nouvelles colonnes au fur et à mesure qu’elle traite vos données. Lorsqu’un nouveau champ est détecté par from_json, le schéma déduit est mis à jour avec le schéma le plus récent en ajoutant les nouvelles colonnes à la fin du schéma. Les types de données des colonnes existantes restent inchangés. Après la mise à jour du schéma, le pipeline redémarre automatiquement avec le schéma mis à jour.

from_json prend en charge les modes suivants pour l’évolution du schéma, que vous définissez à l’aide du paramètre facultatif schemaEvolutionMode . Ces modes sont cohérents avec le chargeur automatique.

schemaEvolutionMode Comportement lors de la lecture d’une nouvelle colonne
addNewColumns (valeur par défaut) Échec du flux. De nouvelles colonnes sont ajoutées au schéma. Les colonnes existantes n’évoluent pas dans les types de données.
rescue Le schéma n’est jamais évolué et le flux n’échoue pas en raison des modifications de schéma. Toutes les nouvelles colonnes sont enregistrées dans la colonne de données sauvée.
failOnNewColumns Échec du flux. Stream ne redémarre pas, sauf si les schemaHints données sont mises à jour ou si les données incriminées sont supprimées.
none N’évolue pas le schéma, les nouvelles colonnes sont ignorées et les données ne sont pas sauvées, sauf si l’option rescuedDataColumn est définie. Le flux n’échoue pas en raison des modifications de schéma.

Par exemple:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Colonne de données sauvées

Une colonne de données sauvée est automatiquement ajoutée à votre schéma en tant que _rescued_data. Vous pouvez renommer la colonne en définissant l’option rescuedDataColumn . Par exemple:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

Lorsque vous choisissez d’utiliser la colonne de données sauvée, toutes les colonnes qui ne correspondent pas au schéma déduit sont sauvées au lieu d’être supprimées. Cela peut se produire en raison d’une incompatibilité de type de données, d’une colonne manquante dans le schéma ou d’une différence de casse de nom de colonne.

Gérer les enregistrements endommagés

Pour stocker les enregistrements mal formés et ne peuvent pas être analysés, ajoutez une _corrupt_record colonne en définissant des indicateurs de schéma, comme dans l’exemple suivant :

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Pour renommer la colonne d’enregistrement endommagée, définissez l’option columnNameOfCorruptRecord .

L’analyseur JSON prend en charge trois modes de gestion des enregistrements endommagés :

Mode Descriptif
PERMISSIVE Pour les enregistrements endommagés, place la chaîne mal formée dans un champ configuré par columnNameOfCorruptRecord et définit les champs mal formés sur null. Pour conserver les enregistrements endommagés, vous pouvez définir un champ de type chaîne nommé columnNameOfCorruptRecord dans un schéma défini par l’utilisateur. Si un schéma n’a pas le champ, les enregistrements endommagés sont supprimés pendant l’analyse. Lors de l’inférence d’un schéma, l’analyseur ajoute implicitement un columnNameOfCorruptRecord champ dans le schéma de sortie.
DROPMALFORMED Ignore les enregistrements endommagés.
Lorsque vous utilisez DROPMALFORMED le mode avec rescuedDataColumn, les incompatibilités de type de données n’entraînent pas la suppression des enregistrements. Seuls les enregistrements endommagés sont supprimés, tels que json incomplet ou mal formé.
FAILFAST Lève une exception lorsque l’analyseur rencontre des enregistrements corrompus.
Lorsque vous utilisez le mode FAILFAST avec rescuedDataColumn, les incompatibilités de types de données ne génèrent pas d’erreur. Seuls les enregistrements endommagés génèrent des erreurs, par exemple un JSON incomplet ou mal formé.

Reportez-vous à un champ dans la sortie de from_json

from_json déduit le schéma pendant l’exécution du pipeline. Si une requête en aval fait référence à un from_json champ avant l’exécution réussie de la from_json fonction au moins une fois, le champ ne se résout pas et la requête est ignorée. Dans l’exemple suivant, l’analyse de la requête de table silver est ignorée jusqu’à ce que la from_json fonction dans la requête bronze ait exécuté et déduit le schéma.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

Si la from_json fonction et les champs auxquels elle déduit sont référencées dans la même requête, l’analyse peut échouer comme dans l’exemple suivant :

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Vous pouvez résoudre ce problème en déplaçant la référence du champ from_json dans une requête ultérieure (comme l’exemple bronze/argent ci-dessus.) Vous pouvez également spécifier les schemaHints qui contiennent les champs from_json référencés. Par exemple:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Exemples : déduire et faire évoluer automatiquement le schéma

Cette section fournit un exemple de code permettant d’activer l’inférence et l’évolution automatiques de schéma à l’aide de from_json dans les pipelines déclaratifs Spark de Lakeflow.

Créer une table de diffusion en continu à partir du stockage d’objets cloud

L’exemple suivant utilise read_files la syntaxe pour créer une table de streaming à partir du stockage d’objets cloud.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python

@dp.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Créer une table de diffusion en continu à partir de Kafka

L’exemple suivant utilise la syntaxe read_kafka pour créer une table en streaming à partir de Kafka.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Python

@dp.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Exemples : Schéma fixe

Pour obtenir un exemple de code utilisant from_json un schéma fixe, consultez from_json la fonction.

Questions fréquentes (FAQ)

Cette section répond aux questions fréquemment posées sur l’inférence de schéma et la prise en charge de l’évolution dans la from_json fonction.

Quelle est la différence entre from_json et parse_json?

La parse_json fonction retourne une VARIANT valeur de la chaîne JSON.

VARIANT offre un moyen flexible et efficace de stocker des données semi-structurées. Cela contourne l’inférence de schéma et l’évolution en supprimant complètement les types stricts. Toutefois, si vous souhaitez appliquer un schéma au moment de l’écriture (par exemple, parce que vous disposez d’un schéma relativement strict), from_json cela peut être une meilleure option.

Le tableau suivant décrit les différences entre from_json et parse_json:

Fonction Cas d’utilisation Availability
from_json Évolution du schéma avec from_json et maintien du schéma. Cela est utile lorsque :
  • Vous souhaitez appliquer votre schéma de données (par exemple, examiner chaque modification de schéma avant de la rendre persistante).
  • Vous souhaitez optimiser le stockage et exiger une faible latence et un coût de requête faible.
  • Vous souhaitez échouer sur les données avec des types incompatibles.
  • Vous souhaitez extraire des résultats partiels à partir d’enregistrements JSON endommagés et stocker l’enregistrement mal formé dans la _corrupt_record colonne. En revanche, l’ingestion VARIANT retourne une erreur pour JSON non valide.
Disponible uniquement avec l’inférence de schéma et l’évolution dans les pipelines déclaratifs Spark Lakeflow
parse_json VARIANT est particulièrement adapté à la conservation des données qui n’ont pas besoin d’être schématisées. Par exemple:
  • Vous souhaitez conserver les données semi-structurées, car elles sont flexibles.
  • Le schéma change trop rapidement pour l'adapter à un schéma sans échecs de flux fréquents et redémarrages.
  • Vous ne souhaitez pas échouer sur les données avec des types incompatibles. (L’ingestion VARIANT réussit toujours pour les enregistrements JSON valides, même s’il existe des incompatibilités de type.)
  • Vos utilisateurs ne souhaitent pas gérer la colonne de données sauvée contenant des champs qui ne sont pas conformes au schéma.
Disponible avec et sans pipelines déclaratifs Spark Lakeflow

Puis-je utiliser la from_json syntaxe d’inférence et d’évolution de schéma en dehors des pipelines déclaratifs Spark Lakeflow ?

Non, vous ne pouvez pas utiliser la syntaxe d'inférence et d'évolution de schéma from_json en dehors des pipelines déclaratifs Spark Lakeflow.

Comment accéder au schéma déduit par from_json?

Affichez le schéma de la table de diffusion en continu cible.

Puis-je passer from_json un schéma et faire aussi l’évolution ?

Non, vous ne pouvez pas passer from_json un schéma et faire également l’évolution. Toutefois, vous pouvez fournir des indicateurs de schéma pour remplacer certains ou tous les champs déduits par from_json.

Que se passe-t-il pour le schéma si la table est entièrement actualisée ?

Les emplacements de schéma associés à la table sont vidés et le schéma est réanalysé à partir de zéro.