Événements
31 mars, 23 h - 2 avr., 23 h
L’événement de la communauté Microsoft Fabric, Power BI, SQL et AI ultime. 31 mars au 2 avril 2025.
Inscrivez-vous aujourd’huiCe navigateur n’est plus pris en charge.
Effectuez une mise à niveau vers Microsoft Edge pour tirer parti des dernières fonctionnalités, des mises à jour de sécurité et du support technique.
Auto Loader simplifie un certain nombre de tâches courantes d’ingestion de données. Cet aide-mémoire fournit des exemples de plusieurs modèles populaires.
Les modèles Glob peuvent être utilisés pour filtrer les répertoires et les fichiers lorsqu’ils sont fournis dans le chemin d’accès.
Modèle | Description |
---|---|
? |
Correspond à n’importe quel caractère unique |
* |
Correspond à zéro, un ou plusieurs caractères |
[abc] |
Correspond à un seul caractère du jeu de caractères {a,b,c}. |
[a-z] |
Correspond à un seul caractère de la plage de caractères {a…z}. |
[^a] |
Correspond à un seul caractère qui ne fait pas partie du jeu ou de la plage de caractères {a}. Notez que le caractère ^ doit se trouver immédiatement à droite du crochet ouvrant. |
{ab,cd} |
Correspond à une chaîne du jeu de chaînes {ab, cd}. |
{ab,c{de, fh}} |
Correspond à une chaîne du jeu de chaînes {ab, cde, cfh}. |
Utilisez le path
pour fournir des modèles de préfixe, par exemple :
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Important
Vous devez utiliser l’option pathGlobFilter
pour fournir explicitement des modèles de suffixes. Le path
ne fournit qu’un filtre de préfixe.
Par exemple, si vous souhaitez analyser uniquement les fichiers png
d’un répertoire qui contient des fichiers avec des suffixes différents, vous pouvez effectuer ceci :
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Notes
Le comportement d’utilisation des caractères génériques (globbing) par défaut d’Auto Loader est différent de celui des autres sources de fichiers Spark. Ajoutez .option("cloudFiles.useStrictGlobber", "true")
à votre lecture pour utiliser le globbing qui correspond au comportement Spark par défaut sur les sources de fichiers. Pour plus d’informations sur le globbing, consultez le tableau suivant :
Modèle | Chemins d'accès au fichier | Globber par défaut | Globber strict |
---|---|---|---|
/a/b | /a/b/c/file.txt | Oui | Oui |
/a/b | /a/b_dir/c/file.txt | Non | Non |
/a/b | /a/b.txt | Non | Non |
/a/b/ | /a/b.txt | Non | Non |
/a/*/c/ | /a/b/c/file.txt | Oui | Oui |
/a/*/c/ | /a/b/c/d/file.txt | Oui | Oui |
/a/*/c/ | /a/b/x/y/c/file.txt | Oui | Non |
/a/*/c | /a/b/c_file.txt | Oui | Non |
/a/*/c/ | /a/b/c_file.txt | Oui | Non |
/a/*/c/ | /a/*/cookie/file.txt | Oui | Non |
/a/b* | /a/b.txt | Oui | Oui |
/a/b* | /a/b/file.txt | Oui | Oui |
/a/{0.txt,1.txt} | /a/0.txt | Oui | Oui |
/a/*/{0.txt,1.txt} | /a/0.txt | Non | Non |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | Oui | Oui |
Un moyen simple de transférer vos données dans Delta Lake sans en perdre consiste à utiliser le modèle suivant et à activer l'inférence de schéma avec Auto Loader. Databricks recommande d'exécuter le code suivant dans une tâche Azure Databricks pour qu'il redémarre automatiquement votre stream lorsque le schéma de vos données source change. Par défaut, le schéma est déduit en tant que types de chaîne. Toute erreur d'analyse (il ne devrait pas y en avoir si tout reste sous forme de chaîne) ira dans _rescued_data
et toute nouvelle colonne fera échouer le stream et fera évoluer le schéma.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Lorsque vous connaissez votre schéma, mais que vous voulez savoir si vous recevez des données inattendues, Databricks recommande d'utiliser rescuedDataColumn
.
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Si vous voulez que votre stream arrête le traitement si un nouveau champ est introduit ne correspondant pas à votre schéma, vous pouvez ajouter :
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Lorsque vous recevez des données d'un fournisseur qui introduit de nouvelles colonnes dans les informations qu'il fournit, il se peut que vous ne sachiez pas exactement quand il le fait, ou que vous n'ayez pas la bande passante nécessaire pour mettre à jour votre pipeline de données. Vous pouvez désormais tirer parti de l’évolution du schéma pour redémarrer le stream et laisser Auto Loader mettre à jour automatiquement le schéma déduit. Vous pouvez également utiliser schemaHints
pour certains des champs « sans schéma » que le fournisseur peut transmettre.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Étant donné que le chargeur automatique déduit les colonnes JSON de niveau supérieur en tant que chaînes, vous pouvez vous retrouver avec des objets JSON imbriqués qui nécessitent d’autres transformations. Vous pouvez utiliser des API d’accès aux données semi-structurées pour transformer davantage de contenu JSON complexe.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Quand vous avez des données imbriquées, vous pouvez utiliser l’option cloudFiles.inferColumnTypes
pour inférer la structure imbriquée de vos données et d’autres types de colonnes.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Une fois les données stockées dans Delta Lake, vous pouvez exécuter l’inférence distribuée sur les données. Consultez Effectuer une inférence distribuée à l'aide de pandas UDF.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Delta Live Tables fournit une syntaxe Python légèrement modifiée pour Auto Loader et ajoute la prise en charge SQL pour Auto Loader.
Les exemples suivants utilisent Auto Loader pour créer des jeux de données à partir de fichiers CSV et JSON :
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map()
, vous pouvez transmettre des options à la méthode cloud_files()
. Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
L’exemple suivant lit des données à partir de fichiers CSV délimités par des tabulations avec un en-tête :
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
Vous pouvez utiliser l’élément schema
pour spécifier le format manuellement. Vous devez spécifier cet élément schema
pour les formats qui ne prennent pas en charge l’inférence de schéma :
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Notes
Delta Live Tables configure et gère automatiquement les répertoires de schéma et de point de contrôle lors de l’utilisation d’Auto Loader pour lire les fichiers. Toutefois, si vous configurez manuellement l’un de ces répertoires, l’actualisation complète n’affecte pas le contenu des répertoires configurés. Databricks recommande d’utiliser les répertoires configurés automatiquement pour éviter des effets secondaires inattendus lors du traitement.
Événements
31 mars, 23 h - 2 avr., 23 h
L’événement de la communauté Microsoft Fabric, Power BI, SQL et AI ultime. 31 mars au 2 avril 2025.
Inscrivez-vous aujourd’hui