Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Vous pouvez charger des données à partir de n’importe quelle source de données prise en charge par Apache Spark sur Azure Databricks à l’aide de pipelines déclaratifs Lakeflow. Vous pouvez définir des jeux de données (tables et vues) dans les pipelines déclaratifs de Lakeflow pour n’importe quelle requête qui retourne un DataFrame Spark, y compris les DataFrames de streaming et les DataFrames Pandas pour Spark. Pour les tâches d’ingestion de données, Databricks recommande d’utiliser des tables de streaming pour la plupart des cas d’usage. Les tables de streaming sont utiles pour ingérer des données à partir du stockage d’objets cloud à l’aide d’Auto Loader ou de bus de messages comme Kafka.
Remarque
- Les sources de données ne prennent pas toutes en charge SQL. Vous pouvez combiner des notebooks SQL et Python dans des pipelines déclaratifs Lakeflow pour utiliser SQL pour toutes les opérations au-delà de l’ingestion.
- Pour plus d’informations sur l’utilisation des bibliothèques non empaquetées dans les pipelines déclaratifs Lakeflow par défaut, consultez Gérer les dépendances Python pour les pipelines déclaratifs Lakeflow.
- Pour obtenir des informations générales sur l’ingestion dans Azure Databricks, consultez les connecteurs Standard dans Lakeflow Connect.
Ci-dessous, des exemples de modèles courants.
Charger depuis une table existante
Chargez des données à partir de n’importe quelle table existante dans Azure Databricks. Vous pouvez transformer les données à l’aide d’une requête ou charger la table pour un traitement ultérieur dans votre pipeline.
L’exemple suivant lit les données d’une table existante :
Python
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Charger des fichiers à partir du stockage d’objets cloud
Databricks recommande d’utiliser le chargeur automatique avec des pipelines déclaratifs Lakeflow pour la plupart des tâches d’ingestion de données à partir du stockage d’objets cloud ou à partir de fichiers dans un volume de catalogue Unity. Le chargeur automatique et les pipelines déclaratifs Lakeflow sont conçus pour charger de manière incrémentielle et idempotente des données toujours croissantes à mesure qu’elles arrivent dans le stockage cloud.
Voir Qu’est-ce que le chargeur automatique ? et charger des données à partir du stockage d’objets.
L’exemple suivant lit les données du stockage cloud à l’aide du chargeur automatique :
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
Les exemples suivants utilisent le chargeur automatique pour créer des jeux de données à partir de fichiers CSV dans un volume de catalogue Unity :
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Remarque
- Si vous utilisez Auto Loader avec les notifications de fichiers, et si vous exécutez une actualisation complète pour votre pipeline ou votre table de streaming, vous devez nettoyer manuellement vos ressources. Vous pouvez utiliser CloudFilesResourceManager dans un notebook pour effectuer le nettoyage.
- Pour charger des fichiers avec le chargeur automatique dans un pipeline compatible avec le catalogue Unity, vous devez utiliser des emplacements externes. Pour en savoir plus sur l’utilisation du catalogue Unity avec des pipelines déclaratifs Lakeflow, consultez Utiliser le catalogue Unity avec vos pipelines déclaratifs Lakeflow.
Charger des données à partir d’un bus de messages
Vous pouvez configurer des pipelines déclaratifs Lakeflow pour ingérer des données à partir de bus de messages. Databricks recommande d’utiliser des tables de diffusion en continu avec une exécution continue et une mise à l’échelle automatique améliorée pour fournir l’ingestion la plus efficace pour le chargement à faible latence à partir de bus de messages. Voir Optimiser l’utilisation du cluster des pipelines déclaratifs Lakeflow avec la mise à l’échelle automatique.
Par exemple, le code suivant configure une table de diffusion en continu pour ingérer des données à partir de Kafka à l’aide de la fonction read_kafka :
Python
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Pour ingérer depuis d’autres sources de bus de messages, consultez :
- Kinesis : read_kinesis
- Pub/Sub rubrique : read_pubsub
- Pulsar : read_pulsar
Charger des données à partir d’Azure Event Hubs
Azure Event Hubs est un service de streaming de données qui fournit une interface compatible Apache Kafka. Vous pouvez utiliser le connecteur Kafka de Structured Streaming, inclus dans l'environnement d'exécution des pipelines déclaratifs Lakeflow, pour charger des messages depuis Azure Event Hubs. Pour en savoir plus sur le chargement et le traitement des messages à partir d'Azure Event Hubs, consultez Utiliser Azure Event Hubs comme source de données pour les pipelines déclaratifs Lakeflow.
Charger des données à partir de systèmes externes
Lakeflow Declarative Pipelines prend en charge le chargement de données à partir de n’importe quelle source de données prise en charge par Azure Databricks. Consultez Se connecter aux sources de données et aux services externes. Vous pouvez également charger des données externes à l’aide de Lakehouse Federation pour les sources de données prises en charge. Étant donné que Lakehouse Federation nécessite Databricks Runtime 13.3 LTS ou une version ultérieure, pour utiliser Lakehouse Federation, votre pipeline doit être configuré pour utiliser le canal en préversion.
Certaines sources de données n’ont pas de prise en charge équivalente en SQL. Si vous ne pouvez pas utiliser Lakehouse Federation avec l’une de ces sources de données, vous pouvez utiliser un notebook Python pour ingérer les données à partir de la source. Vous pouvez ajouter du code source Python et SQL au même pipeline. L’exemple suivant déclare une vue matérialisée pour accéder à l’état actuel des données dans une table PostgreSQL distante :
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Charger des jeux de données petits ou statiques à partir du stockage d’objets cloud
Vous pouvez charger des jeux de données petits ou statiques à l’aide de la syntaxe de chargement Apache Spark. Lakeflow Declarative Pipelines prend en charge tous les formats de fichier pris en charge par Apache Spark sur Azure Databricks. Pour obtenir une liste complète, consultez les options de format de données.
Les exemples suivants illustrent le chargement de JSON pour créer des tables de pipelines déclaratifs Lakeflow.
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Remarque
La fonction SQL read_files
est commune à tous les environnements SQL sur Azure Databricks. Il s’agit du modèle recommandé pour l’accès direct aux fichiers à l’aide de SQL avec des pipelines déclaratifs Lakeflow. Pour plus d’informations, consultez Options.
Configurer une table de streaming pour ignorer les modifications dans une table de streaming source
Remarque
- L’indicateur
skipChangeCommits
ne fonctionne qu'avecspark.readStream
en utilisant la fonctionoption()
. Vous ne pouvez pas utiliser cet indicateur dans une fonctiondlt.read_stream()
. - Vous ne pouvez pas utiliser l’indicateur
skipChangeCommits
lorsque la table de diffusion en continu source est définie comme cible d’une fonction create_auto_cdc_flow().
Par défaut, les tables de streaming nécessitent des sources en ajout uniquement. Lorsqu'une table de streaming utilise une autre table de streaming comme source et que la table de streaming source nécessite des mises à jour ou des suppressions, par exemple, le traitement du « droit à l'oubli » RGPD, l'indicateur skipChangeCommits
peut être défini lors de la lecture de la table de streaming pour ignorer ces modifications. Pour plus d’informations sur cet indicateur, consultez Ignorer les mises à jour et les suppressions.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Accéder de manière sécurisée aux identifiants de stockage avec des secrets dans un pipeline
Vous pouvez utiliser des secrets Azure Databricks pour stocker des informations d’identification telles que des clés d’accès ou des mots de passe. Pour configurer le secret dans votre pipeline, utilisez une propriété Spark dans la configuration de cluster des paramètres de pipeline. Consultez Configurer le calcul pour les pipelines déclaratifs Lakeflow.
L’exemple suivant utilise un secret pour stocker une clé d’accès requise pour lire les données d’entrée à partir d’un compte de stockage Azure Data Lake Storage (ADLS) à l’aide du chargeur automatique. Vous pouvez utiliser cette même méthode pour configurer tout secret requis par votre pipeline, par exemple, des clés AWS pour accéder à S3 ou le mot de passe pour un metastore Apache Hive.
Pour en savoir plus sur l’utilisation d’Azure Data Lake Storage, consultez Se connecter à Azure Data Lake Storage et au Stockage Blob.
Remarque
Vous devez ajouter le préfixe spark.hadoop.
à la clé de configuration spark_conf
qui définit la valeur du secret.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Remplacer
-
<storage-account-name>
par le nom du compte de stockage ADLS. -
<scope-name>
par le nom de l'étendue du secret Azure Databricks. -
<secret-name>
par le nom de la clé contenant la clé d'accès du compte de stockage Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Remplacer
-
<container-name>
par le nom du conteneur de compte de stockage Azure qui stocke les données d’entrée. -
<storage-account-name>
par le nom du compte de stockage ADLS. -
<path-to-input-dataset>
par le chemin du jeu de données d’entrée.