Exécuter votre première charge de travail Structured Streaming

Cet article fournit des exemples de code et une explication des concepts de base nécessaires pour exécuter vos premières requêtes Structured Streaming sur Azure Databricks. Vous pouvez utiliser Structured Streaming pour des charges de travail de traitement en quasi-temps réel et incrémentiel.

Structured Streaming est l’une des technologies qui alimentent les tables de diffusion en continu dans Delta Live Tables. Databricks recommande d’utiliser Delta Live Tables pour toutes les nouvelles charges de travail d’ETL, d’ingestion et Structured Streaming. Consultez l’article Qu’est-ce que Delta Live Tables ?.

Remarque

Alors que Delta Live Tables fournit une syntaxe légèrement modifiée pour déclarer des tables de diffusion en continu, la syntaxe générale pour la configuration des lectures et des transformations de diffusion en continu s’applique à tous les cas d’usage de diffusion en continu sur Azure Databricks. Delta Live Tables simplifie également la diffusion en continu en gérant les informations d’état, les métadonnées et de nombreuses configurations.

Lecture à partir d’un flux de données

Vous pouvez utiliser Structured Streaming pour ingérer de manière incrémentielle des données à partir de sources de données prises en charge. Voici quelques-unes des sources de données les plus courantes utilisées dans les charges de travail Azure Databricks Structured Streaming :

  • Fichiers de données dans le stockage d’objets cloud
  • Bus de messages et files d’attente
  • Delta Lake

Databricks recommande d’utiliser Auto Loader pour l’ingestion de diffusion en continu à partir du stockage d’objets cloud. Auto Loader (le chargeur automatique) prend en charge la plupart des formats de fichiers pris en charge par Structured Streaming. Consultez Qu’est-ce que Auto Loader ?.

Chaque source de données fournit un certain nombre d’options pour spécifier comment charger des lots de données. Pendant la configuration du lecteur, vous devrez peut-être définir les principales options dans les catégories suivantes :

  • Options qui spécifient la source de données ou le format (par exemple, le type de fichier, les délimiteurs et le schéma).
  • Options qui configurent l’accès aux systèmes sources (par exemple, les paramètres de port et les informations d’identification).
  • Options qui spécifient où démarrer dans un flux (par exemple, les décalages Kafka ou la lecture de tous les fichiers existants).
  • Options qui contrôlent la quantité de données traitées dans chaque lot (par exemple, maximum de décalages, de fichiers ou d’octets par lot).

Utiliser Auto Loader pour lire les données de diffusion en continu à partir du stockage d’objets

L’exemple suivant illustre le chargement de données JSON avec le Auto Loader, qui utilise cloudFiles pour indiquer le format et les options. L’option schemaLocation active l’inférence et l’évolution du schéma. Collez le code suivant dans une cellule de notebook Databricks et exécutez la cellule pour créer un DataFrame de diffusion en continu nommé raw_df :

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Comme d’autres opérations de lecture sur Azure Databricks, la configuration d’une lecture de diffusion en continu ne charge pas réellement les données. Vous devez déclencher une action sur les données avant le début du flux.

Remarque

L’appel de display() sur un DataFrame de diffusion en continu démarre une tâche de diffusion en continu. Pour la plupart des cas d’usage Structured Streaming, l’action qui déclenche un flux doit écrire des données dans un récepteur. Consultez Préparation de votre code Structured Streaming pour la production.

Effectuer une transformation de diffusion en continu

Structured Streaming prend en charge la plupart des transformations disponibles dans Azure Databricks et Spark SQL. Vous pouvez même charger des modèles MLflow en tant que fonctions définies par l’utilisateur et effectuer des prédictions de diffusion en continu en tant que transformation.

L’exemple de code suivant effectue une transformation simple pour enrichir les données JSON ingérées avec des informations supplémentaires à l’aide des fonctions Spark SQL :

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

Le transformed_df résultant contient des instructions de requête pour charger et transformer chaque enregistrement à mesure qu’il arrive dans la source de données.

Remarque

Structured Streaming traite les sources de données comme des jeux de données illimités ou infinis. Par conséquent, certaines transformations ne sont pas prises en charge dans les charges de travail Structured Streaming, car elles nécessitent le tri d’un nombre infini d’éléments.

La plupart des agrégations et de nombreuses jointures nécessitent la gestion des informations d’état avec des filigranes, des fenêtres et le mode de sortie. Consultez Appliquer des filigranes pour contrôler les seuils de traitement des données.

Écrire dans un récepteur de données

Un récepteur de données est la cible d’une opération d’écriture de diffusion en continu. Les récepteurs courants utilisés dans les charges de travail de diffusion en continu Azure Databricks sont les suivants :

  • Delta Lake
  • Bus de messages et files d’attente
  • Bases de données clé-valeur

Comme pour les sources de données, la plupart des récepteurs de données fournissent plusieurs options pour contrôler la façon dont les données sont écrites dans le système cible. Pendant la configuration de l’enregistreur, vous devrez peut-être définir les principales options dans les catégories suivantes :

  • Mode de sortie (ajouter par défaut).
  • Un emplacement de point de contrôle (requis pour chaque enregistreur).
  • Intervalles de déclencheur ; consultez Configurer les intervalles de déclencheur Structured Streaming.
  • Options qui spécifient le récepteur de données ou le format (par exemple, le type de fichier, les délimiteurs et le schéma).
  • Options qui configurent l’accès aux systèmes cibles (par exemple, les paramètres de port et les informations d’identification).

Effectuer une écriture par lot incrémentielle dans Delta Lake

L’exemple suivant écrit dans Delta Lake à l’aide d’un chemin d’accès et d’un point de contrôle de fichier spécifiés.

Important

Veillez toujours à spécifier un emplacement de point de contrôle unique pour chaque enregistreur de diffusion en continu que vous configurez. Le point de contrôle fournit l’identité unique de votre flux, en suivant tous les enregistrements traités et les informations d’état associés à votre requête de diffusion en continu.

Le paramètre availableNow du déclencheur indique à Structured Streaming de traiter tous les enregistrements précédemment non traités à partir du jeu de données source, puis d’arrêter, afin de pouvoir exécuter en toute sécurité le code suivant sans vous soucier de quitter un flux en cours d’exécution :

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

Dans cet exemple, aucun nouvel enregistrement n’arrive dans notre source de données. Par conséquent, la répétition de l’exécution de ce code n’ingère pas de nouveaux enregistrements.

Avertissement

L’exécution de Structured Streaming peut empêcher l’arrêt automatique causé par l’arrêt des ressources de calcul. Pour éviter des coûts inattendus, veillez à arrêter les requêtes de diffusion en continu.

Préparation de votre code Structured Streaming pour la production

Databricks recommande d’utiliser Delta Live Tables pour la plupart des charges de travail Structured Streaming. Les recommandations suivantes fournissent un point de départ pour la préparation des charges de travail Structured Streaming pour la production :

  • Supprimez le code inutile des notebooks qui retourne des résultats, tels que display et count.
  • N’exécutez pas de charges de travail Structured Streaming sur des clusters interactifs ; planifiez toujours des flux en tant que travaux.
  • Pour faciliter la récupération automatique des travaux de diffusion en continu, configurez les travaux avec un nombre infini de nouvelles tentatives.
  • N’utilisez pas la mise à l’échelle automatique pour les charges de travail avec Structured Streaming.

Pour davantage de recommandations, consultez Considérations relatives à la production pour Structured Streaming.

Lire des données à partir de Delta Lake, transformer et écrire dans Delta Lake

Delta Lake offre une prise en charge étendue de l’utilisation de Structured Streaming à la fois en tant que source et récepteur. Voir Lectures et écritures en diffusion en continu sur des tables Delta.

L’exemple suivant illustre l’exemple de syntaxe permettant de charger de manière incrémentielle tous les nouveaux enregistrements d’une table Delta, de les joindre à un instantané d’une autre table Delta et de les écrire dans une table Delta :

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

Vous devez disposer des autorisations appropriées configurées pour lire les tables sources et écrire dans les tables cibles et l’emplacement de point de contrôle spécifié. Renseignez tous les paramètres indiqués avec les signes inférieur et supérieur (<>) à l’aide des valeurs pertinentes pour vos sources de données et récepteurs.

Remarque

Delta Live Tables fournit une syntaxe entièrement déclarative pour créer des pipelines Delta Lake et gère automatiquement des propriétés telles que des déclencheurs et des points de contrôle. Consultez l’article Qu’est-ce que Delta Live Tables ?.

Lire des données à partir de Kafka, transformer et écrire dans Kafka

Apache Kafka et d’autres bus de messagerie fournissent une latence parmi les plus faibles disponibles pour les jeux de données volumineux. Vous pouvez utiliser Azure Databricks pour appliquer des transformations aux données ingérées à partir de Kafka, puis réécrire des données dans Kafka.

Remarque

L’écriture de données dans le stockage d’objets cloud ajoute une surcharge de latence supplémentaire. Si vous souhaitez stocker des données à partir d’un bus de messagerie dans Delta Lake, mais que vous avez besoin de la latence la plus faible possible pour les charges de travail de diffusion en continu, Databricks recommande de configurer des travaux de diffusion en continu distincts pour ingérer des données dans le lakehouse et d’appliquer des transformations en quasi-temps réel pour les récepteurs de bus de messagerie en aval.

L’exemple de code suivant illustre un modèle simple pour enrichir des données à partir de Kafka en les joignant à des données dans une table Delta, puis réécrire dans Kafka :

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Vous devez disposer des autorisations appropriées configurées pour l’accès à votre service Kafka. Renseignez tous les paramètres indiqués avec les signes inférieur et supérieur (<>) à l’aide des valeurs pertinentes pour vos sources de données et récepteurs. Consultez Traitement de flux avec Apache Kafka et Azure Databricks.