Charger des données avec Delta Live Tables

Vous pouvez charger des données à partir de n’importe quelle source de données prise en charge par Apache Spark sur Azure Databricks à l’aide de Delta Live Tables. Vous pouvez définir des jeux de données (tables et vues) dans Delta Live Tables sur n’importe quelle requête qui retourne un DataFrame Spark, y compris les DataFrames et Pandas de diffusion en continu pour les DataFrames Spark. Pour les tâches d’ingestion de données, Databricks recommande d’utiliser des tables de streaming pour la plupart des cas d’usage. Les tables de streaming sont utiles pour ingérer des données à partir du stockage d’objets cloud à l’aide d’Auto Loader ou de bus de messages comme Kafka. Ci-dessous, des exemples de modèles courants.

Important

Toutes les sources de données ne prennent pas en charge SQL. Vous pouvez combiner des notebooks SQL et Python dans un pipeline Delta Live Tables pour utiliser SQL pour toutes les opérations au-delà de l’ingestion.

Pour plus d’informations sur l’utilisation de bibliothèques non empaquetées dans Delta Live Tables par défaut, consultez Gérer les dépendances Python pour les pipelines Delta Live Tables.

Charger des fichiers à partir du stockage d’objets cloud

Databricks recommande d’utiliser Auto Loader avec Delta Live Tables pour la plupart des tâches d’ingestion des données à partir du stockage d’objets cloud. Auto Loader et Delta Live Tables sont conçus pour charger de manière incrémentielle et idempotente le volume toujours croissant de données, à mesure que celles-ci arrivent dans le stockage cloud. Les exemples suivants utilisent Auto Loader pour créer des jeux de données à partir de fichiers CSV et JSON :

Remarque

Si vous souhaitez charger des fichiers avec Auto Loader dans un pipeline pour lequel Unity Catalog est activé, vous devez utiliser des emplacements externes. Pour en savoir plus sur l’utilisation de Unity Catalog avec Delta Live Tables, consultez l’article Utiliser Unity Catalog avec vos pipelines Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Consultez Qu’est-ce qu’Auto Loader ? et Syntaxe SQL Auto Loader.

Avertissement

Si vous utilisez Auto Loader avec les notifications de fichiers, et si vous exécutez une actualisation complète pour votre pipeline ou votre table de streaming, vous devez nettoyer manuellement vos ressources. Vous pouvez utiliser CloudFilesResourceManager dans un notebook pour effectuer un nettoyage.

Charger des données à partir d’un bus de messages

Vous pouvez configurer des pipelines Delta Live Tables pour ingérer les données de bus de messages avec les tables de streaming. Databricks recommande de combiner les tables de streaming avec une exécution continue et une mise à l’échelle automatique améliorée afin de fournir l’ingestion la plus efficace possible pour un chargement à faible latence à partir des bus de messages. Consultez Optimiser l’utilisation des clusters des pipelines Delta Live Tables avec la mise à l’échelle automatique améliorée.

Par exemple, le code suivant configure une table de streaming pour ingérer des données provenant de Kafka :

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Vous pouvez écrire des opérations en aval en SQL pur pour effectuer des transformations en streaming sur ces données, comme dans l’exemple suivant :

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Pour obtenir un exemple d’utilisation avec Event Hubs, consultez Utiliser Azure Event Hubs comme source de données Delta Live Tables.

Consultez Configurer des sources de données de diffusion en continu.

Charger des données à partir de systèmes externes

Delta Live Tables prend en charge le chargement de données à partir de n’importe quelle source de données prise en charge par Azure Databricks. Consultez Se connecter aux sources de données. Vous pouvez également charger des données externes à l’aide de Lakehouse Federation pour les sources de données prises en charge. Dans la mesure où Lakehouse Federation nécessite Databricks Runtime 13.3 LTS ou une version ultérieure, pour utiliser Lakehouse Federation, votre pipeline doit être configuré de manière à utiliser le canal de préversion.

Certaines sources de données n’ont pas de prise en charge équivalente en SQL. Si vous ne pouvez pas utiliser Lakehouse Federation avec l’une de ces sources de données, vous pouvez utiliser un notebook Python autonome pour ingérer les données à partir de la source. Ce notebook peut ensuite être ajouté en tant que bibliothèque source avec des notebooks SQL pour créer un pipeline Delta Live Tables. L’exemple suivant déclare une vue matérialisée pour accéder à l’état actuel des données dans une table PostgreSQL distante :

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Charger des jeux de données statiques ou de petite taille à partir du stockage d’objets cloud

Vous pouvez charger des jeux de données petits ou statiques à l’aide de la syntaxe de chargement Apache Spark. Delta Live Tables prend en charge tous les formats de fichiers pris en charge par Apache Spark sur Azure Databricks. Pour obtenir une liste complète, consultez Options de format de données.

Les exemples suivants montrent le chargement de données JSON pour créer des tables Delta Live Tables :

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Remarque

La construction SQL SELECT * FROM format.`path`; est commune à tous les environnements SQL sur Azure Databricks. Il s’agit du modèle recommandé pour l’accès direct aux fichiers à l’aide de SQL avec Delta Live Tables.

Accéder de manière sécurisée aux informations d’identification du stockage avec des secrets dans un pipeline

Vous pouvez utiliser les secrets Azure Databricks pour stocker les informations d’identification telles que les clés d’accès ou les mots de passe. Pour configurer le secret dans votre pipeline, utilisez une propriété Spark dans la configuration du cluster de paramètres de pipeline. Consultez Configurer vos paramètres de calcul.

L’exemple suivant utilise un secret pour stocker une clé d’accès requise pour lire les données d’entrée à partir d’un compte de stockage Azure Data Lake Storage Gen2 (ADLS Gen2) à l’aide du chargeur automatique. Vous pouvez utiliser cette même méthode pour configurer tout secret requis par votre pipeline, par exemple des clés AWS pour accéder à S3 ou le mot de passe pour un metastore Apache Hive.

Pour en savoir plus sur l’utilisation d’Azure Data Lake Storage Gen2, consultez Se connecter à Azure Data Lake Storage Gen2 et au service Stockage Blob.

Remarque

Vous devez ajouter le préfixe spark.hadoop. à la clé de configuration spark_conf qui définit la valeur du secret.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> par le nom du compte de stockage ADLS Gen2.
  • <scope-name> avec le nom de l'étendue secrète d'Azure Databricks.
  • <secret-name> avec le nom de la clé contenant la clé d'accès au compte de stockage Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> avec le nom du conteneur de compte de stockage Azure qui stocke les données d’entrée.
  • <storage-account-name> par le nom du compte de stockage ADLS Gen2.
  • <path-to-input-dataset> avec le chemin d’accès au jeu de données d’entrée.

Charger des données d’Azure Event Hubs

Azure Event Hubs est un service de flux de données qui fournit une interface compatible Apache Kafka. Vous pouvez utiliser le connecteur Kafka Structured Streaming, inclus dans le runtime Delta Live Tables, pour charger des messages à partir d’Azure Event Hubs. Pour en savoir plus sur le chargement et le traitement de messages à partir d’Azure Event Hubs, consultez Utiliser Azure Event Hubs comme source de données Delta Live Tables.