Modèles courants de chargement de données
Auto Loader simplifie un certain nombre de tâches courantes d’ingestion de données. Cet aide-mémoire fournit des exemples de plusieurs modèles populaires.
Filtrage des répertoires ou des fichiers à l’aide de modèles Glob
Les modèles Glob peuvent être utilisés pour filtrer les répertoires et les fichiers lorsqu’ils sont fournis dans le chemin d’accès.
Modèle | Description |
---|---|
? |
Correspond à n’importe quel caractère unique |
* |
Correspond à zéro, un ou plusieurs caractères |
[abc] |
Correspond à un seul caractère du jeu de caractères {a,b,c}. |
[a-z] |
Correspond à un seul caractère de la plage de caractères {a…z}. |
[^a] |
Correspond à un seul caractère qui ne fait pas partie du jeu ou de la plage de caractères {a}. Notez que le caractère ^ doit se trouver immédiatement à droite du crochet ouvrant. |
{ab,cd} |
Correspond à une chaîne du jeu de chaînes {ab, cd}. |
{ab,c{de, fh}} |
Correspond à une chaîne du jeu de chaînes {ab, cde, cfh}. |
Utilisez le path
pour fournir des modèles de préfixe, par exemple :
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Important
Vous devez utiliser l’option pathGlobFilter
pour fournir explicitement des modèles de suffixes. Le path
ne fournit qu’un filtre de préfixe.
Par exemple, si vous souhaitez analyser uniquement les fichiers png
d’un répertoire qui contient des fichiers avec des suffixes différents, vous pouvez effectuer ceci :
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Notes
Le comportement d’utilisation des caractères génériques (globbing) par défaut d’Auto Loader est différent de celui des autres sources de fichiers Spark. Ajoutez .option("cloudFiles.useStrictGlobber", "true")
à votre lecture pour utiliser le globbing qui correspond au comportement Spark par défaut sur les sources de fichiers. Pour plus d’informations sur le globbing, consultez le tableau suivant :
Modèle | Chemins d'accès au fichier | Globber par défaut | Globber strict |
---|---|---|---|
/a/b | /a/b/c/file.txt | Oui | Oui |
/a/b | /a/b_dir/c/file.txt | Non | Non |
/a/b | /a/b.txt | Non | Non |
/a/b/ | /a/b.txt | Non | Non |
/a/*/c/ | /a/b/c/file.txt | Oui | Oui |
/a/*/c/ | /a/b/c/d/file.txt | Oui | Oui |
/a/*/c/ | /a/b/x/y/c/file.txt | Oui | Non |
/a/*/c | /a/b/c_file.txt | Oui | Non |
/a/*/c/ | /a/b/c_file.txt | Oui | Non |
/a/*/c/ | /a/*/cookie/file.txt | Oui | Non |
/a/b* | /a/b.txt | Oui | Oui |
/a/b* | /a/b/file.txt | Oui | Oui |
/a/{0.txt,1.txt} | /a/0.txt | Oui | Oui |
/a/*/{0.txt,1.txt} | /a/0.txt | Non | Non |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Oui | Oui |
Activer Easy ETL
Un moyen simple de transférer vos données dans Delta Lake sans en perdre consiste à utiliser le modèle suivant et à activer l'inférence de schéma avec Auto Loader. Databricks recommande d'exécuter le code suivant dans une tâche Azure Databricks pour qu'il redémarre automatiquement votre stream lorsque le schéma de vos données source change. Par défaut, le schéma est déduit en tant que types de chaîne. Toute erreur d'analyse (il ne devrait pas y en avoir si tout reste sous forme de chaîne) ira dans _rescued_data
et toute nouvelle colonne fera échouer le stream et fera évoluer le schéma.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Empêcher la perte de données dans les données bien structurées
Lorsque vous connaissez votre schéma, mais que vous voulez savoir si vous recevez des données inattendues, Databricks recommande d'utiliser rescuedDataColumn
.
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Si vous voulez que votre stream arrête le traitement si un nouveau champ est introduit ne correspondant pas à votre schéma, vous pouvez ajouter :
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Activer des pipelines de données semi-structurés flexibles
Lorsque vous recevez des données d'un fournisseur qui introduit de nouvelles colonnes dans les informations qu'il fournit, il se peut que vous ne sachiez pas exactement quand il le fait, ou que vous n'ayez pas la bande passante nécessaire pour mettre à jour votre pipeline de données. Vous pouvez désormais tirer parti de l’évolution du schéma pour redémarrer le stream et laisser Auto Loader mettre à jour automatiquement le schéma déduit. Vous pouvez également utiliser schemaHints
pour certains des champs « sans schéma » que le fournisseur peut transmettre.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transformer des données JSON imbriquées
Étant donné que le chargeur automatique déduit les colonnes JSON de niveau supérieur en tant que chaînes, vous pouvez vous retrouver avec des objets JSON imbriqués qui nécessitent d’autres transformations. Vous pouvez utiliser des API d’accès aux données semi-structurées pour transformer davantage de contenu JSON complexe.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Inférer des données JSON imbriquées
Quand vous avez des données imbriquées, vous pouvez utiliser l’option cloudFiles.inferColumnTypes
pour inférer la structure imbriquée de vos données et d’autres types de colonnes.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Charger des fichiers CSV sans en-têtes
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Appliquer un schéma sur des fichiers CSV avec des en-têtes
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingérer des données image ou binaires dans Delta Lake pour ML
Une fois les données stockées dans Delta Lake, vous pouvez exécuter l’inférence distribuée sur les données. Consultez Effectuer une inférence distribuée à l'aide de pandas UDF.
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Syntaxe Auto Loader pour DLT
Delta Live Tables fournit une syntaxe Python légèrement modifiée pour Auto Loader et ajoute la prise en charge SQL pour Auto Loader.
Les exemples suivants utilisent Auto Loader pour créer des jeux de données à partir de fichiers CSV et JSON :
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map()
, vous pouvez transmettre des options à la méthode cloud_files()
. Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
L’exemple suivant lit des données à partir de fichiers CSV délimités par des tabulations avec un en-tête :
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Vous pouvez utiliser l’élément schema
pour spécifier le format manuellement. Vous devez spécifier cet élément schema
pour les formats qui ne prennent pas en charge l’inférence de schéma :
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Notes
Delta Live Tables configure et gère automatiquement les répertoires de schéma et de point de contrôle lors de l’utilisation d’Auto Loader pour lire les fichiers. Toutefois, si vous configurez manuellement l’un de ces répertoires, l’actualisation complète n’affecte pas le contenu des répertoires configurés. Databricks recommande d’utiliser les répertoires configurés automatiquement pour éviter des effets secondaires inattendus lors du traitement.