Lire en anglais

Partager via


Modèles courants de chargement de données

Auto Loader simplifie un certain nombre de tâches courantes d’ingestion de données. Cet aide-mémoire fournit des exemples de plusieurs modèles populaires.

Filtrage des répertoires ou des fichiers à l’aide de modèles Glob

Les modèles Glob peuvent être utilisés pour filtrer les répertoires et les fichiers lorsqu’ils sont fournis dans le chemin d’accès.

Modèle Description
? Correspond à n’importe quel caractère unique
* Correspond à zéro, un ou plusieurs caractères
[abc] Correspond à un seul caractère du jeu de caractères {a,b,c}.
[a-z] Correspond à un seul caractère de la plage de caractères {a…z}.
[^a] Correspond à un seul caractère qui ne fait pas partie du jeu ou de la plage de caractères {a}. Notez que le caractère ^ doit se trouver immédiatement à droite du crochet ouvrant.
{ab,cd} Correspond à une chaîne du jeu de chaînes {ab, cd}.
{ab,c{de, fh}} Correspond à une chaîne du jeu de chaînes {ab, cde, cfh}.

Utilisez le path pour fournir des modèles de préfixe, par exemple :

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Important

Vous devez utiliser l’option pathGlobFilter pour fournir explicitement des modèles de suffixes. Le path ne fournit qu’un filtre de préfixe.

Par exemple, si vous souhaitez analyser uniquement les fichiers png d’un répertoire qui contient des fichiers avec des suffixes différents, vous pouvez effectuer ceci :

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Notes

Le comportement d’utilisation des caractères génériques (globbing) par défaut d’Auto Loader est différent de celui des autres sources de fichiers Spark. Ajoutez .option("cloudFiles.useStrictGlobber", "true") à votre lecture pour utiliser le globbing qui correspond au comportement Spark par défaut sur les sources de fichiers. Pour plus d’informations sur le globbing, consultez le tableau suivant :

Modèle Chemins d'accès au fichier Globber par défaut Globber strict
/a/b /a/b/c/file.txt Oui Oui
/a/b /a/b_dir/c/file.txt Non Non
/a/b /a/b.txt Non Non
/a/b/ /a/b.txt Non Non
/a/*/c/ /a/b/c/file.txt Oui Oui
/a/*/c/ /a/b/c/d/file.txt Oui Oui
/a/*/c/ /a/b/x/y/c/file.txt Oui Non
/a/*/c /a/b/c_file.txt Oui Non
/a/*/c/ /a/b/c_file.txt Oui Non
/a/*/c/ /a/*/cookie/file.txt Oui Non
/a/b* /a/b.txt Oui Oui
/a/b* /a/b/file.txt Oui Oui
/a/{0.txt,1.txt} /a/0.txt Oui Oui
/a/*/{0.txt,1.txt} /a/0.txt Non Non
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Oui Oui

Activer Easy ETL

Un moyen simple de transférer vos données dans Delta Lake sans en perdre consiste à utiliser le modèle suivant et à activer l'inférence de schéma avec Auto Loader. Databricks recommande d'exécuter le code suivant dans une tâche Azure Databricks pour qu'il redémarre automatiquement votre stream lorsque le schéma de vos données source change. Par défaut, le schéma est déduit en tant que types de chaîne. Toute erreur d'analyse (il ne devrait pas y en avoir si tout reste sous forme de chaîne) ira dans _rescued_data et toute nouvelle colonne fera échouer le stream et fera évoluer le schéma.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Empêcher la perte de données dans les données bien structurées

Lorsque vous connaissez votre schéma, mais que vous voulez savoir si vous recevez des données inattendues, Databricks recommande d'utiliser rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Si vous voulez que votre stream arrête le traitement si un nouveau champ est introduit ne correspondant pas à votre schéma, vous pouvez ajouter :

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Activer des pipelines de données semi-structurés flexibles

Lorsque vous recevez des données d'un fournisseur qui introduit de nouvelles colonnes dans les informations qu'il fournit, il se peut que vous ne sachiez pas exactement quand il le fait, ou que vous n'ayez pas la bande passante nécessaire pour mettre à jour votre pipeline de données. Vous pouvez désormais tirer parti de l’évolution du schéma pour redémarrer le stream et laisser Auto Loader mettre à jour automatiquement le schéma déduit. Vous pouvez également utiliser schemaHints pour certains des champs « sans schéma » que le fournisseur peut transmettre.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformer des données JSON imbriquées

Étant donné que le chargeur automatique déduit les colonnes JSON de niveau supérieur en tant que chaînes, vous pouvez vous retrouver avec des objets JSON imbriqués qui nécessitent d’autres transformations. Vous pouvez utiliser des API d’accès aux données semi-structurées pour transformer davantage de contenu JSON complexe.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inférer des données JSON imbriquées

Quand vous avez des données imbriquées, vous pouvez utiliser l’option cloudFiles.inferColumnTypes pour inférer la structure imbriquée de vos données et d’autres types de colonnes.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Charger des fichiers CSV sans en-têtes

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Appliquer un schéma sur des fichiers CSV avec des en-têtes

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingérer des données image ou binaires dans Delta Lake pour ML

Une fois les données stockées dans Delta Lake, vous pouvez exécuter l’inférence distribuée sur les données. Consultez Effectuer une inférence distribuée à l'aide de pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Syntaxe Auto Loader pour DLT

Delta Live Tables fournit une syntaxe Python légèrement modifiée pour Auto Loader et ajoute la prise en charge SQL pour Auto Loader.

Les exemples suivants utilisent Auto Loader pour créer des jeux de données à partir de fichiers CSV et JSON :

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map(), vous pouvez transmettre des options à la méthode cloud_files(). Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

L’exemple suivant lit des données à partir de fichiers CSV délimités par des tabulations avec un en-tête :

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

Vous pouvez utiliser l’élément schema pour spécifier le format manuellement. Vous devez spécifier cet élément schema pour les formats qui ne prennent pas en charge l’inférence de schéma :

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Notes

Delta Live Tables configure et gère automatiquement les répertoires de schéma et de point de contrôle lors de l’utilisation d’Auto Loader pour lire les fichiers. Toutefois, si vous configurez manuellement l’un de ces répertoires, l’actualisation complète n’affecte pas le contenu des répertoires configurés. Databricks recommande d’utiliser les répertoires configurés automatiquement pour éviter des effets secondaires inattendus lors du traitement.