Modifier

Partager via


Création d’un Delta Lake prenant en charge les requêtes ad hoc dans la réservation en ligne de loisirs et de voyages

Hubs d'événements Azure
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Cette architecture offre un exemple de Delta Lake pour la réservation de voyages, où de grandes quantités de documents bruts sont générées à une fréquence élevée.

Apache® et Apache Spark™ sont soit des marques déposées, soit des marques commerciales d’Apache Software Foundation aux États-Unis et/ou dans d’autres pays. L’utilisation de ces marques n’implique aucune approbation de l’Apache Software Foundation.

Architecture

Diagramme de l’architecture de Delta Lake.

Téléchargez un fichier Visio de cette architecture.

Les scénarios de réservation de loisirs et de voyages peuvent générer de grandes quantités de documents bruts à une fréquence élevée. Toutefois, il n’est pas forcément nécessaire d’indexer la totalité du contenu de ces documents. Par exemple, les utilisateurs peuvent avoir besoin d’effectuer une recherche à l’aide d’un ID de transaction connu ou d’un nom de client à une certaine date, afin de récupérer un ensemble de documents intéressants.

Dataflow

Le concept qui sous-tend cette architecture consiste à découpler des données nues les métadonnées utiles pour la recherche :

  • Seules les métadonnées sont indexées au sein d’un service requêtable (par exemple Spark), tandis que les données réelles sont stockées dans un lac de données.
  • Les documents bruts d’un lac de données sont liés à des métadonnées indexées suivant leur chemin.
  • Lorsqu’une requête est exécutée sur des documents, le service effectue une recherche parmi les métadonnées des documents. En retour, les documents en question sont récupérés à partir du lac de données au moyen de leur chemin.

Cette solution permet de réduire considérablement les coûts et d’augmenter le niveau de performance, car les métadonnées ne représentent qu’une fraction de l’ensemble de l’espace de données (par exemple, plusieurs pétaoctets de documents bruts peuvent être décrits par quelques dizaines de gigaoctets de métadonnées concises).

En outre, la gestion de la fusion de la profondeur historique et des exigences de temps réel dans un système uniforme, facile à gérer et très performant constitue un défi courant de ce type de scénario. L’architecture Delta Lake permet de relever ce défi.

Composants

Azure App Service est un service PaaS (platform as a service) de création et d’hébergement d’applications dans des machines virtuelles managées. App Service, qui gère l’infrastructure de calcul sous-jacente sur laquelle les applications s’exécutent, effectue le monitoring des quotas d’utilisation des ressources et des métriques d’application, la journalisation des informations de diagnostic et l’émission d’alertes en fonction des métriques.

Azure Data Factory est le service ETL (Extract, Transform, and Load) cloud d’Azure qui permet le scale-out de l’intégration et la transformation de données serverless. Il offre une interface utilisateur sans code pour une création intuitive et une supervision et une gestion dans une seule et même vue. Vous pouvez également effectuer un lift-and-shift de packages SQL Server Integration Services (SSIS) existants vers Azure et les exécuter avec une compatibilité complète dans Azure Data Factory.

Azure Data Lake Storage Gen2 est un ensemble de fonctionnalités dédiées à l’analytique du Big Data et conçues sur le Stockage Blob Azure. Data Lake Storage Gen2 fait converger les fonctionnalités d’Azure Data Lake Storage Gen1 avec le service Stockage Blob Azure. Par exemple, Data Lake Storage Gen2 fournit une sémantique du système de fichiers, une sécurité au niveau des fichiers et la mise à l’échelle. Comme ces fonctionnalités sont basées sur le Stockage Blob, vous bénéficiez également d’un stockage hiérarchisé à faible coût avec des fonctionnalités de haute disponibilité et de récupération d’urgence.

Azure Event Hubs est un service complètement managé d’ingestion des données en temps réel à la fois simple, fiable et évolutif. Diffusez en continu des millions d’événements par seconde à partir de n’importe quelle source pour créer des pipelines de données dynamiques et répondre immédiatement aux défis commerciaux.

Azure Databricks est une plateforme d’analytique données basée sur Apache Spark et optimisée pour Microsoft Azure Cloud Services. Azure Databricks offre trois environnements pour développer des applications gourmandes en données : Databricks SQL, Databricks Data Science & Engineering et Databricks Machine Learning.

Autres solutions

À la place des métadonnées, il est possible d’indexer toutes les données brutes dans un service qui offre des fonctionnalités de requête, par exemple Azure Databricks, Azure Synapse Analytics, la Recherche cognitive Azure ou Azure Data Explorer. Cette approche est plus immédiate. Néanmoins, prêtez attention à l’effet combiné de la taille des données, des exigences de niveau de performance et de la fréquence de mise à jour, en particulier du point de vue du coût.

Contrairement au Delta Lake, une architecture Lambda permet de conserver les données en temps réel dans un autre référentiel que les données historiques. Le client exécute la logique de façon à rendre les requêtes hétérogènes transparentes pour l’utilisateur. L’avantage de cette solution est qu’elle permet d’utiliser un plus grand ensemble de services (par exemple Azure Stream Analytics et Azure SQL Database), mais l’architecture devient plus complexe et le codebase plus coûteux à gérer.

Spark est distribué avec Azure Databricks, Azure Synapse Analytics et Azure HDInsight. Par conséquent, cette architecture peut être implémentée avec n’importe lequel de ces services de données Azure, de préférence avec une version récente de Spark prenant en charge la version 0.8 ou 1.0 de Delta Lake.

Détails du scénario

Dans les scénarios de réservation de loisirs et de voyages, la visibilité des données brutes est importante pour plusieurs acteurs. Les équipes du support technique supervisent les diagnostics en temps réel pour effectuer un monitoring en continu du traitement transactionnel et réagir rapidement aux problèmes indésirables. Les ingénieurs de données contrôlent l’exportation des données pour permettre leur examen par les parties prenantes et alimenter l’analytique en temps réel. Les membres du service clientèle ont besoin de données historiques et de données récentes pour traiter les demandes et réclamations des clients. Enfin, le service juridique veille au respect des responsabilités en matière de conformité et à l’exécution des actions légales. Ces types d’exigences sont caractéristiques des places de marché qui agrègent des fournisseurs externes et gèrent les achats utilisateurs. Par exemple, les systèmes de réservation de loisirs et de voyages suppriment des intermédiaires entre utilisateurs et fournisseurs de services pour la recherche de services, l’agrégation d’offres intéressantes des fournisseurs et la gestion des réservations des utilisateurs.

Diagramme d’une place de marché avec des fournisseurs de services et des utilisateurs B2B et B2C.

Cas d’usage potentiels

Cette architecture s’avère idéale pour les secteurs du voyage et de l’hôtellerie. Elle est applicable aux scénarios suivants :

  • Récupération rapide en temps réel (par exemple pour les diagnostics) ou historique (pour la conformité) des documents bruts dans leur format d’origine
  • Gestion de pétaoctets de données
  • Garantie de performances de l’ordre de quelques secondes pour les diagnostics en temps réel
  • Approche unifiée des diagnostics en temps réel, des requêtes historiques et des flux d’analytique
  • Alimentation de l’analytique en temps réel en aval
  • Contrôle des coûts
  • Internalisation des données sous forme de documents bruts (par exemple des fichiers JSON, XML ou CSV)
  • Lorsqu’une fraction des données suffit pour décrire des requêtes
  • Lorsque les utilisateurs souhaitent récupérer des documents bruts complets
  • Lorsque la taille totale des données impliquerait une mise à l’échelle du système supérieure à votre prix cible

Cette architecture peut ne pas convenir dans les cas suivants :

  • Les données sont internalisées sous forme de recordsets.
  • Les utilisateurs doivent exécuter des requêtes d’analytique.
  • Les utilisateurs sont prêts à utiliser leur propre outil décisionnel packagé.
  • La taille des données n’est pas un problème du point de vue des coûts.

Les documents bruts ne sont pas nécessairement requis.

Considérations

Ces considérations implémentent les piliers d’Azure Well-Architected Framework qui est un ensemble de principes directeurs qui permettent d’améliorer la qualité d’une charge de travail. Pour plus d'informations, consultez Microsoft Azure Well-Architected Framework.

Efficacité des performances

L’efficacité des performances est la capacité de votre charge de travail à s’adapter à la demande des utilisateurs de façon efficace. Pour plus d’informations, consultez Vue d’ensemble du pilier d’efficacité des performances.

Les utilisateurs effectuent une connexion à deux tronçons pour accéder aux données. Ils interrogent d’abord les métadonnées, puis récupèrent l’ensemble de documents souhaité. Il peut être difficile de réutiliser des ressources clientes existantes ou packagées.

Azure Data Lake Storage Gen2 fournit trois niveaux d’accès : chaud, froid et archive. Dans les scénarios où les documents sont récupérés occasionnellement, le niveau de performances froid devrait garantir des performances similaires à celles du niveau de performance chaud, mais avec l’avantage de réduire les coûts. Dans les scénarios où la probabilité de récupération est plus élevée avec des données plus récentes, envisagez de mélanger les niveaux froid et chaud. Le stockage de niveau archive peut également éviter d’avoir recours à la suppression matérielle, et réduire la taille des données en conservant uniquement les informations significatives ou des données plus agrégées.

Le lac de données gère potentiellement plusieurs pétaoctets de données. De ce fait, les stratégies de conservation des données s’appliquent généralement. Les solutions de gouvernance des données doivent être utilisées pour gérer le cycle de vie des données, par exemple quand déplacer les anciennes données entre les niveaux de stockage chaud et froid, quand supprimer ou archiver les anciennes données et quand agréger des informations dans une solution d’analytique en aval.

Demandez-vous si cette approche peut fonctionner avec les scénarios d’analytique en aval. Bien que cet exemple de charge de travail ne soit pas destiné à l’analytique, il convient pour alimenter l’analytique en temps réel en aval, tandis que les scénarios de traitement par lots peuvent être approvisionnés à partir du lac de données.

Extensibilité

Azure Event Hubs est très polyvalent lorsqu’il s’agit de découpler un système transactionnel qui génère des documents bruts à partir d’un système de diagnostic et de conformité. Il est facile à implémenter dans les architectures déjà établies et, enfin, facile à utiliser. Il se peut toutefois que le système transactionnel utilise déjà le modèle de diffusion en continu pour traiter les documents entrants. Dans ce cas, vous devrez probablement intégrer dans l’application de diffusion en continu une logique de gestion des diagnostics et de la conformité sous la forme d’un sous-flux.

DevOps

Pour déployer automatiquement les services utilisés dans cet exemple de charge de travail, il est préférable d’utiliser des processus d’intégration continue et de déploiement continu (CI/CD). Envisagez d’utiliser une solution comme Azure DevOps ou GitHub Actions.

Optimisation des coûts

L’optimisation des coûts consiste à examiner les moyens de réduire les dépenses inutiles et d’améliorer l’efficacité opérationnelle. Pour plus d’informations, consultez Vue d’ensemble du pilier d’optimisation des coûts.

En règle générale, utilisez la calculatrice de prix Azure pour estimer les coûts. Pour plus d’informations sur les coûts, consultez la section correspondante dans Microsoft Azure Well-Architected Framework.

Déployer ce scénario

Dans l’exemple d’architecture suivant, nous supposons qu’un ou plusieurs espaces de noms Azure Event Hubs contiennent des documents bruts structurés (par exemple des fichiers JSON ou XML). Toutefois, le type et le format réels des documents et des services sources ainsi que leur type d’intégration dépendent fortement du scénario et de l’architecture.

Diffusion en continu

Avec Spark Structured Streaming, les données brutes sont extraites, décompressées, analysées et converties en données tabulaires dans un DataFrame de diffusion en continu.

L’extrait de code PySpark suivant permet de charger un DataFrame de diffusion en continu à partir d’Event Hubs :

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

L’extrait de code suivant permet de traiter le DataFrame de diffusion en continu. Il commence si nécessaire par décompresser le message Event Hubs, puis il analyse sa structure JSON dans un format tabulaire. Ce code constitue un exemple qui doit être adapté en fonction du scénario :

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

Le traitement réel des données se compose de deux étapes. La première consiste à extraire les métadonnées pour faciliter la recherche des documents bruts après le traitement. Les métadonnées réelles dépendent du cas d’usage. On peut néanmoins citer des exemples généralisables, à savoir les dates et identificateurs appropriés, les types de documents, le service source et le type de catégorie :

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

La deuxième étape de traitement consiste à générer un chemin vers Azure Data Lake Storage Gen2, où seront stockés les documents bruts :

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Ingestion des métadonnées dans un Delta Lake

Les métadonnées sont écrites dans une table Delta offrant des fonctionnalités de requête en temps réel. Les écritures sont diffusées en continu dans une mémoire tampon, et les requêtes envoyées à la table peuvent fusionner les résultats de la mémoire tampon avec ceux de la partie historique de la table.

L’extrait de code suivant montre comment définir une table Delta dans le metastore et la partitionner en fonction de la date :

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Notez que le champ transactionId est numérique. Les messages classiques qui transmettent des systèmes distribués peuvent utiliser plutôt des GUID pour identifier les transactions de manière unique. Toutefois, les types de données numériques offrent un niveau supérieur de performance des requêtes sur la plupart des plateformes de données.

Il peut être difficile d’attribuer un identificateur de transaction unique en raison de la nature distribuée des plateformes de données cloud (comme Spark). Une approche utile consiste à baser cet identificateur de transaction sur un identificateur de partition (par exemple le numéro de partition Event Hubs) et un numéro incrémentiel au sein de la partition. monotonically_increasing_id(), dans Azure Databricks, constitue un exemple de cette solution.

L’extrait de code suivant montre comment ajouter le flux comportant les métadonnées des documents bruts à la table Delta :

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Notez que le partitionnement est géré lors de l’écriture du flux en fonction du schéma de la table.

Ingestion des données dans un lac de données

Les documents bruts réels sont écrits au sein d’un niveau de performance de stockage approprié dans Azure Data Lake Storage Gen2.

L’extrait de code suivant montre une fonction simple servant à charger un fichier vers Azure Data Lake Storage Gen2. La méthode foreach utilisée dans la classe DataStreamWriter permet de charger le fichier hébergé dans chacun des enregistrements du DataFrame de diffusion en continu :

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Client

Le client peut être une application web personnalisée qui utilise des métadonnées pour récupérer le chemin des documents à partir de la table Delta avec des instructions SQL standard et, en retour, le document réel à partir du lac de données avec des API Azure Data Lake Storage Gen2 standard.

L’extrait de code suivant, par exemple, montre comment récupérer le chemin de tous les documents d’une certaine transaction :

select * from metadata where transactionId = '123456'

Étapes suivantes

Consultez l’aide associée en matière d’architecture :

Reportez-vous à ces architectures connexes :