Partager via


Transférer des données de diffusion en continu dans lakehouse avec la diffusion de flux structuré Spark

La diffusion de flux structuré est un moteur de traitement de flux de données évolutif et tolérant aux pannes basé sur Spark. Spark s’occupe d’exécuter l’opération de diffusion en continu de manière incrémentielle et continue à mesure que les données continuent d’arriver.

La diffusion de flux structuré est devenu disponible dans Spark 2.2. Depuis lors, il s’agit de l’approche recommandée pour la diffusion de flux de données. Le principe fondamental du flux structuré consiste à traiter un flux de données en direct comme une table où de nouvelles données sont toujours ajoutées en continu, comme une nouvelle ligne dans une table. Il existe quelques sources de fichiers de diffusion en continu intégrées définies, telles que CSV, JSON, ORC, Parquet, et une prise en charge intégrée des services de messagerie comme Kafka et Event Hubs.

Cet article fournit des informations sur la manière d’optimiser le traitement et l’ingestion d’événements via le streaming structuré Spark dans des environnements de production à haut débit. Les approches suggérées sont les suivantes :

  • Optimisation du débit de diffusion de flux de données
  • Optimisation des opérations d’écriture dans la table delta et
  • Traitement par lot des événements

Définitions de tâches Spark et notebooks Spark

Les notebooks Spark sont un excellent outil pour valider des idées et expérimenter afin d’obtenir des insights à partir de vos données ou de votre code. Les notebooks sont largement utilisés pour la préparation de données, la visualisation, l’apprentissage automatique et d’autres scénarios en lien avec le Big Data. Les définitions de tâche Spark sont des travaux orientés code non interactifs qui s’exécutent sur un cluster Spark pendant de longues périodes. Les définitions de tâche Spark fournissent robustesse et disponibilité.

Les notebooks Spark sont une excellente source pour tester la logique de votre code et répondre à toutes les exigences de l’entreprise. Toutefois, pour assurer leur exécution continue dans un scénario de production, les définitions de tâches Spark avec la stratégie de réessai activée sont la meilleure solution.

Stratégie de nouvelles tentatives pour les définitions de tâches Spark

Dans Microsoft Fabric, l’utilisateur peut définir une stratégie de nouvelles tentatives pour les travaux de définition de tâche Spark. Bien que le script dans le travail puisse être infini, l’infrastructure qui exécute le script peut entraîner un problème nécessitant l’arrêt du travail. Ou le travail pourrait être éliminé en raison des besoins de mise à jour corrective de l’infrastructure sous-jacente. La stratégie de nouvelles tentatives permet à l’utilisateur de définir des règles de redémarrage automatique du travail en cas d’arrêt en raison de problèmes sous-jacents. Les paramètres spécifient la fréquence à laquelle le travail doit être redémarré, jusqu’à une infinité de nouvelles tentatives et définissent le temps entre les nouvelles tentatives. De cette façon, les utilisateurs peuvent s’assurer que leurs travaux de définition de tâche Spark continuent de s’exécuter à l’infini jusqu’à ce que l’utilisateur décide de les arrêter.

Sources de diffusion en continu

La configuration de la diffusion en continu avec Event Hubs nécessite une configuration de base, qui inclut le nom de l’espace de noms Event Hubs, le nom du hub, le nom de la clé d’accès partagé et le groupe de consommateurs. Un groupe de consommateurs est une vue de l’ensemble d’un Event Hub. Cela permet à plusieurs applications consommatrices d’avoir chacune une vue distincte de l’eventstream et de lire le flux indépendamment à leur propre rythme et avec leurs propres décalages.

Les partitions sont un élément essentiel de la capacité à traiter un volume élevé de données. Un seul processeur a une capacité limitée de traitement des événements par seconde, tandis que plusieurs processeurs peuvent faire un meilleur travail lorsqu’ils sont exécutés en parallèle. Les partitions permettent de traiter de grands volumes d’événements en parallèle.

Si trop de partitions sont utilisées avec un faible taux d’ingestion, les lecteurs de partitions traitent une petite partie de ces données, ce qui entraîne un traitement non optimal. Le nombre idéal de partitions dépend directement du taux de traitement souhaité. Si vous souhaitez redimensionner votre traitement des événements, envisagez d’ajouter des partitions. Il n’existe aucune limite de débit spécifique sur une partition. En revanche, le débit cumulé dans votre espace de noms est limité par le nombre d’unités de débit. Lorsque vous augmentez le nombre d’unités de débit de votre espace de noms, vous souhaiterez peut-être avoir des partitions supplémentaires pour permettre aux lecteurs fonctionnant simultanément d’atteindre leur débit maximal.

Il est recommandé d’examiner et de tester le meilleur nombre de partitions pour votre scénario de débit. Mais il est courant de voir des scénarios avec un débit élevé utilisant 32 partitions ou plus.

Le connecteur Azure Event Hubs pour Apache Spark (azure-event-hubs-spark) est recommandé pour connecter l’application Spark à Azure Event Hubs.

Lakehouse en tant que récepteur de streaming

Delta Lake est une couche de stockage open source qui fournit des transactions ACID (atomicité, cohérence, isolation et durabilité) en plus des solutions Data Lake Storage. Delta Lake prend également en charge la gestion des métadonnées évolutives, l’évolution du schéma, le voyage dans le temps (contrôle de version des données), le format ouvert et d’autres fonctionnalités.

Dans l'ingénierie des données Fabric, Delta Lake est utilisé pour :

  • Insérer, mettre à jour et supprimer facilement des données à l'aide de Spark SQL.
  • Compacter les données pour réduire le temps consacré à interroger les données.
  • Afficher l’état des tables avant et après l’exécution des opérations.
  • Récupérer un historique des opérations effectuées sur les tables.

Delta est ajouté en tant que l’un des formats de récepteurs de sortie possibles utilisés dans writeStream. Pour plus d’informations sur les récepteurs de sortie existants, consultez Guide de programmation Spark Structured Streaming.

L’exemple suivant montre comment il est possible de diffuser des données dans Delta Lake.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

À propos du code extrait dans l’exemple :

  • format() est l’instruction qui définit le format de sortie des données.
  • outputMode() définit la façon dont les nouvelles lignes de la diffusion en continu sont écrites (c’est-à-dire, ajouter, remplacer).
  • toTable() conserve les données diffusées en continu dans une table Delta créée à l’aide de la valeur passée en tant que paramètre.

Optimisation des écritures Delta

Le partitionnement des données est un élément essentiel de la création d’une solution de diffusion en continu robuste : le partitionnement améliore l’organisation des données et également le débit. Les fichiers sont facilement fragmentés après les opérations Delta, ce qui entraîne un trop grand nombre de petits fichiers. Et les fichiers trop volumineux sont également un problème, en raison du temps d’écriture élevé sur le disque. Le défi avec le partitionnement des données est de trouver l’équilibre approprié qui aboutit à des tailles de fichiers optimales. Spark prend en charge le partitionnement en mémoire et sur disque. Les données correctement partitionnées peuvent offrir les meilleures performances lors de la persistance des données dans Delta Lake et de l’interrogation des données depuis Delta Lake.

  • Lorsque vous partitionnez des données sur disque, vous pouvez choisir comment partitionner les données en fonction des colonnes à l’aide de partitionBy(). partitionBy() est une fonction utilisée pour partitionner un modèle sémantique volumineux en fichiers plus petits en fonction d’une ou de plusieurs colonnes fournies lors de l’écriture sur disque. Le partitionnement est un moyen d’améliorer les performances des requêtes lors de l’utilisation d’un modèle sémantique volumineux. Évitez de choisir une colonne qui génère des partitions trop petites ou trop grandes. Définissez une partition basée sur un ensemble de colonnes avec une bonne cardinalité et fractionnez les données en fichiers de taille optimale.
  • Le partitionnement des données en mémoire peut être effectué à l’aide des transformations repartition() ou coalesce(), qui distribuent des données sur plusieurs nœuds Worker et créent plusieurs tâches qui peuvent lire et traiter des données en parallèle à l’aide des principes de base du jeu de données distribué résilient (RDD). Il permet de diviser le modèle sémantique en partitions logiques, qui peuvent être calculées sur différents nœuds du cluster.
    • repartition() permet d’augmenter ou de diminuer le nombre de partitions en mémoire. Cette fonction réorganise toutes les données sur le réseau et les répartit de manière équilibrée sur toutes les partitions.
    • coalesce() est utilisé uniquement pour réduire efficacement le nombre de partitions. Il s’agit d’une version optimisée de repartition() où le déplacement des données entre toutes les partitions est plus faible en utilisant coalesce().

La combinaison des deux approches de partitionnement est une bonne solution dans un scénario avec un débit élevé. repartition() crée un nombre spécifique de partitions en mémoire, tandis que partitionBy() écrit des fichiers sur le disque pour chaque partition de mémoire et colonne de partitionnement. L’exemple suivant illustre l’utilisation des deux stratégies de partitionnement dans le même travail Spark : les données sont d’abord fractionnées en 48 partitions en mémoire (en supposant que nous avons un total de 48 cœurs de processeur), puis partitionnées sur disque en fonction de deux colonnes existantes dans la charge utile.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Écriture optimisée

Une autre option pour optimiser les écritures dans Delta Lake consiste à utiliser l’écriture optimisée. L’écriture optimisée est une fonctionnalité facultative qui améliore la façon dont les données sont écrites dans la table Delta. Spark fusionne ou fractionne les partitions avant d’écrire les données, ce qui optimise le débit des données écrites sur le disque. Toutefois, cela entraîne une lecture aléatoire complète, ce qui peut provoquer une détérioration des performances pour certaines charges de travail. Les travaux utilisant coalesce() et/ou repartition() pour partitionner des données sur disque peuvent être refactorisés pour commencer à utiliser l’écriture optimisée à la place.

Le code suivant est un exemple d’utilisation de l’écriture optimisée. Notez que partitionBy() est toujours utilisé.

spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Événements de traitement par lot

Pour réduire le nombre d’opérations et améliorer le temps d’ingestion des données dans Delta Lake, le traitement par lot des événements est une alternative pratique.

Les déclencheurs définissent la fréquence à laquelle une requête en streaming doit être exécutée (déclenchée) et émettre de nouvelles données. Leur configuration définit un intervalle de temps de traitement périodique pour les microlots, accumulant les données et regroupant les événements en quelques opérations persistantes, au lieu d'écrire sur le disque tout le temps.

L’exemple suivant montre une requête de diffusion en continu où les événements sont traités périodiquement par intervalles d’une minute.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

L’avantage de combiner le traitement par lot des événements dans les opérations d’écriture dans la table Delta est que cela permet a création de fichiers Delta plus volumineux avec plus de données, ce qui évite les petits fichiers. Vous devez analyser la quantité de données ingérées et trouver le meilleur temps de traitement pour optimiser la taille des fichiers Parquet créés par la bibliothèque Delta.

Monitoring

Spark 3.1 et versions ultérieures ont une interface utilisateur de diffusion de flux structuré intégrée contenant les métriques de diffusion en continu suivantes :

  • Taux d’entrée
  • Taux de traitement
  • Lignes d’entrée
  • Durée du traitement
  • Durée de l’opération