Bearbeiten

Freigeben über


Erstellen eines Delta Lake zur Unterstützung von Ad-hoc-Abfragen bei Online-Veranstaltungs- und Reisebuchungen

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Diese Architektur bietet ein Delta Lake-Beispiel für Reisebuchungen, in dem sehr häufig große Mengen an Rohdokumenten generiert werden.

Apache® und Apache Spark™ sind entweder eingetragene Marken oder Marken der Apache Software Foundation in den USA und/oder anderen Ländern. Die Verwendung dieser Markierungen impliziert kein Endorsement durch die Apache Software Foundation.

Aufbau

Diagramm der Delta Lake-Architektur

Laden Sie eine Visio-Datei dieser Architektur herunter.

In Szenarien von Veranstaltungs- und Reisebuchungen können sehr häufig große Mengen von Rohdokumenten generiert werden. Möglicherweise müssen Sie jedoch nicht den gesamten Inhalt dieser Dokumente indizieren. Beispielsweise müssen Benutzer möglicherweise nach einer bekannten Transaktions-ID oder nach einem Kundennamen an einem bestimmten Datum suchen, um eine Reihe von Dokumenten abzurufen, die für sie interessant sind.

Datenfluss

Das Konzept hinter dieser Architektur besteht darin, die für die Suche nützlichen Metadaten von bloßen Daten zu entkoppeln:

  • Nur Metadaten werden in einem abfragbaren Dienst (z. B. Spark) indiziert, während die tatsächlichen Daten in einem Data Lake gespeichert werden.
  • Rohdokumente in einem Data Lake werden über ihren Pfad mit indizierten Metadaten verknüpft.
  • Beim Abfragen von Dokumenten durchsucht der Dienst die Metadaten der Dokumente, und die tatsächlichen Dokumente werden wiederum anhand ihres Pfads aus dem Data Lake abgerufen.

Diese Lösung reduziert die Kosten erheblich und erhöht die Leistung, da Metadaten einen Bruchteil des gesamten Datenbestands bilden (z. B. können mehrere Petabyte an Rohdokumenten durch mehrere Gigabyte an kompakten Metadaten beschrieben werden).

Darüber hinaus ist die Verwaltung der Kombination aus Verlaufstiefe und Echtzeitanforderungen in einem einheitlichen, einfach zu verwaltenden Hochleistungssystem eine typische Herausforderung dieser Art von Szenario. Die Delta Lake-Architektur hat für diese Herausforderung eine Antwort.

Komponenten

Azure App Service ist eine Platform as a Service (PaaS) zum Erstellen und Hosten von Apps auf verwalteten virtuellen Computern. App Service verwaltet die zugrunde liegende Computeinfrastruktur, in der Ihre Apps ausgeführt werden, und ermöglicht das Überwachen von Kontingenten zur Ressourcenverwendung und App-Metriken, Protokollieren von Diagnoseinformationen und Anzeigen von Warnungen basierend auf Metriken.

Azure Data Factory ist der Clouddienst zum Extrahieren, Transformieren und Laden (ETL) von Azure für die serverlose Datenintegration und Datentransformation mit horizontaler Skalierung. Der Dienst bietet eine Benutzeroberfläche ohne Code für die intuitive Erstellung sowie Überwachung und Verwaltung über eine zentrale Konsole. Sie können vorhandene SQL Server Integration Services-Pakete (SSIS-Pakete) auch per Lift & Shift in Azure übertragen und bei voller Kompatibilität in Azure Data Factory ausführen.

Azure Data Lake Storage Gen2 setzt auf Azure Blob Storage auf und bietet eine Reihe von Funktionen für die Big Data-Analyse. Data Lake Storage Gen2 vereint die Funktionen von Azure Data Lake Storage Gen1 und Azure Blob Storage. Beispielsweise bietet Data Lake Storage Gen2 Dateisystemsemantik, Sicherheit auf Dateiebene und Skalierung. Da diese Funktionen auf Blob Storage basieren, profitieren Sie gleichzeitig von kostengünstigem, mehrstufigem Speicher mit Hochverfügbarkeit und Notfallwiederherstellungsfunktionen.

Azure Event Hubs ist ein vollständig verwalteter Echtzeitdienst zur Datenerfassung, der einfach, vertrauenswürdig und skalierbar ist. Streamen Sie Millionen von Ereignissen pro Sekunde aus beliebigen Quellen, um dynamische Datenpipelines zu erstellen, und reagieren Sie sofort auf geschäftliche Herausforderungen.

Azure Databricks ist eine Apache Spark-basierte Datenanalyseplattform, die für Microsoft Azure Cloud Services optimiert ist. Azure Databricks bietet drei Umgebungen für die Entwicklung datenintensiver Anwendungen: Databricks SQL, Databricks Data Science & Engineering und Databricks Machine Learning.

Alternativen

Als Alternative zur ausschließlichen Indizierung von Metadaten können Sie alle Rohdaten in einem Dienst indizieren, der Abfragefunktionen bietet, wie Azure Databricks, Azure Synapse Analytics, Azure Cognitive Search oder Azure Data Explorer. Dieser Ansatz ist direkter, beachten Sie jedoch die gemeinsamen Auswirkungen von Datengröße, Leistungsanforderungen und Aktualisierungshäufigkeit, insbesondere aus Kostensicht.

Im Gegensatz zur Verwendung eines Delta Lake werden bei Verwendung einer Lambda-Architektur Echtzeitdaten in einem anderen Repository gespeichert als Verlaufsdaten, und Ihr Client führt die Logik aus, um heterogene Abfragen für den Benutzer transparent zu machen. Der Vorteil dieser Lösung ist die größere Gruppe von Diensten, die Sie verwenden können (z. B. Azure Stream Analytics und Azure SQL-Datenbank), die Architektur wird jedoch komplexer, und die Verwaltung der Codebasis wird teurer.

Spark wird mit Azure Databricks, Azure Synapse Analytics und Azure HDInsight bereitgestellt. Daher könnte diese Architektur mit jedem dieser Azure-Datendienste implementiert werden, vorzugsweise mit einer aktuellen Spark-Version, die Delta Lake 0.8 oder 1.0 unterstützt.

Szenariodetails

Die Sichtbarkeit von Rohdaten in Szenarien von Veranstaltungs- und Reisebuchungen ist für mehrere Akteure von Bedeutung. Technische Supportteams überwachen Echtzeitdiagnosen, um die Transaktionsverarbeitung kontinuierlich zu überwachen und schnell auf unerwünschte Probleme zu reagieren. Datentechniker überwachen den Export von Daten für die Überprüfung durch Beteiligte und die Einfügung von Analysen in Echtzeit. Kundensupportteams benötigen Verlaufsdaten und aktuelle Daten, um Kundenanfragen und -beschwerden zu verarbeiten. Schließlich stellen juristische Teams sicher, dass Compliance-Pflichten erfüllt und rechtliche Schritte unternommen werden. Diese Arten von Anforderungen sind typisch für Marketplaces, die externe Anbieter aggregieren und Benutzerkäufe verwalten. Beispielsweise werden Benutzer und Dienstanbieter bei der Suche nach Diensten, beim Aggregieren aussagekräftiger Angebote von Anbietern und bei der Verwaltung von Benutzerreservierungen durch Veranstaltungs- und Reisebuchungssysteme ausgeschaltet.

Diagramm eines Marketplace mit Dienstanbietern und B2B- und B2C-Benutzern

Mögliche Anwendungsfälle

Diese Architektur eignet sich ideal für die Reise- und Hotelbranche. Sie kann für die folgenden Szenarien verwendet werden:

  • Schnelles Abrufen von Rohdokumenten in Echtzeit (z. B. für die Diagnose) oder historischen Rohdokumenten (aus Compliancegründen) im ursprünglichen Format
  • Verwalten von Petabytes von Daten
  • Garantieren der Leistung im Sekundenbereich für die Echtzeitdiagnose
  • Erzielung eines einheitlichen Ansatzes für Echtzeitdiagnosen, Verlaufsabfragen und die Einfügung von Analysen
  • Einfügung von Downstreamechtzeitanalysen
  • Kontrolle von Kosten
  • Einlagern von Daten als Rohdokumente (zum Beispiel JSON-, XML- oder CSV-Dateien)
  • Wenn ein Bruchteil der Daten ausreicht, um Abfragen zu beschreiben
  • Wenn Benutzer vollständige Rohdokumente abrufen möchten
  • Wenn die Gesamtdatengröße eine Skalierung des Systems über Ihren Zielpreis hinaus erfordern würde

Diese Architektur ist in den folgenden Fällen möglicherweise nicht geeignet:

  • Daten werden als Recordsets eingelagert.
  • Benutzer müssen Analysen ausführen.
  • Benutzer sind bereit, ihr eigenes gepacktes BI-Tool zu verwenden.
  • Die Größe der Daten ist aus Kostensicht keine Herausforderung.

Rohdokumente sind nicht unbedingt erforderlich.

Überlegungen

Diese Überlegungen beruhen auf den Säulen des Azure Well-Architected Frameworks, d. h. einer Reihe von Grundsätzen, mit denen die Qualität von Workloads verbessert werden kann. Weitere Informationen finden Sie unter Microsoft Azure Well-Architected Framework.

Effiziente Leistung

Leistungseffizienz ist die Fähigkeit Ihrer Workload, auf effiziente Weise eine den Anforderungen der Benutzer entsprechende Skalierung auszuführen. Weitere Informationen finden Sie unter Übersicht über die Säule „Leistungseffizienz“.

Benutzer führen einen doppelten Hop aus, um auf Daten zuzugreifen. Sie fragen zuerst Metadaten ab und rufen dann den gewünschten Satz von Dokumenten ab. Es ist möglicherweise schwierig, vorhandene oder gepackte Clientressourcen wiederzuverwenden.

Azure Data Lake Storage Gen2 bietet drei Zugriffsebenen: „Heiß“, „Kalt“ und „Archiv“. In Szenarien, in denen Dokumente gelegentlich abgerufen werden, sollte die kalte Leistungsstufe eine ähnliche Leistung wie die heiße Leistungsstufe gewährleisten, jedoch mit dem Vorteil geringerer Kosten. In Szenarien, in denen die Wahrscheinlichkeit des Abrufs bei neueren Daten höher ist, sollten Sie die kalte und heiße Ebene kombinieren. Die Verwendung von Speicher auf Archivebene könnte ebenfalls eine Alternative zum endgültigen Löschen darstellen und zudem die Größe von Daten reduzieren, indem nur aussagekräftige Informationen oder stärker aggregierte Daten gespeichert werden.

Der Data Lake verwaltet möglicherweise Petabytes von Daten, sodass allgemein Datenaufbewahrungsrichtlinien gelten. Datengovernance-Lösungen sollten verwendet werden, um den Lebenszyklus von Daten zu verwalten, z. B. wann alte Daten zwischen heißen und kalten Speicherebenen verschoben werden sollen, wann alte Daten gelöscht oder archiviert werden sollen und wann Informationen in eine Downstreamanalyselösung aggregiert werden sollen.

Überlegen Sie, wie dieser Ansatz in Downstreamanalyseszenarien funktionieren kann. Obwohl diese Beispielworkload nicht für Analysen vorgesehen ist, eignet sie sich für die Einfügung von Downstreamechtzeitanalysen, während Batchszenarien stattdessen aus dem Data Lake versorgt werden könnten.

Skalierbarkeit

Azure Event Hubs ist äußerst vielseitig, wenn es darum geht, ein Transaktionssystem zu entkoppeln, das Rohdokumente aus einem Diagnose- und Compliancesystem generiert; Azure Event Hubs ist zudem in bereits eingerichteten Architekturen einfach zu implementieren und letztendlich einfach zu verwenden. Möglicherweise verwendet das Transaktionssystem jedoch bereits das Streamingmuster, um eingehende Dokumente zu verarbeiten. In diesem Fall müssten Sie wahrscheinlich Logik für die Verwaltung von Diagnose und Compliance als Teilstream in die Streaminganwendung integrieren.

DevOps

Für die automatische Bereitstellung der verwendeten Dienste in dieser Beispielworkload ist es am besten, CI/CD-Prozesse (Continuous Integration und Continuous Deployment) zu verwenden. Ziehen Sie die Verwendung einer Lösung wie z. B. Azure DevOps oder GitHub Actions in Erwägung.

Kostenoptimierung

Bei der Kostenoptimierung geht es um die Suche nach Möglichkeiten, unnötige Ausgaben zu reduzieren und die Betriebseffizienz zu verbessern. Weitere Informationen finden Sie unter Übersicht über die Säule „Kostenoptimierung“.

Im Allgemeinen sollten Sie den Azure-Preisrechner verwenden, um Ihre Kosten zu ermitteln. Informationen zu weiteren Kostenaspekten finden Sie im Abschnitt zu Kosten im Microsoft Azure Well-Architected Framework.

Bereitstellen dieses Szenarios

In der folgenden Beispielarchitektur wird davon ausgegangen, dass mindestens ein Azure Event Hubs-Namespace strukturierte Rohdokumente (z. B. JSON- oder XML-Dateien) enthält. Der tatsächliche Typ und das tatsächliche Format von Dokumenten und Quelldiensten sowie deren Art der Integration hängen jedoch stark vom jeweiligen Szenario und der Architektur ab.

Streaming

Beim Spark Structured Streaming (strukturiertes Spark-Streaming) werden Rohdaten in einem Streamingdatenrahmen abgerufen, dekomprimiert, analysiert und in tabellarische Daten übersetzt.

Der folgende PySpark-Codeausschnitt wird verwendet, um einen Streamingdatenrahmen aus Event Hubs zu laden:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

Der folgende Codeausschnitt wird verwendet, um den Streamingdatenrahmen zu verarbeiten. Bei Bedarf wird zunächst die Event Hubs-Nachricht dekomprimiert und anschließend die JSON-Struktur in ein tabellarisches Format analysiert. Dieser Code ist ein Beispiel und sollte an Ihr bestimmtes Szenario angepasst werden:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

Die tatsächliche Datenverarbeitung besteht aus zwei Schritten. Zunächst müssen Metadaten extrahiert werden, um das Durchsuchen der Rohdokumente nach der Verarbeitung zu unterstützen. Die tatsächlichen Metadaten hängen vom Anwendungsfall ab, generalisierbare Beispiele wären jedoch relevante Datumsangaben und Bezeichner, Dokumenttypen, Quelldienst und jede Art von Kategorie:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

Der zweite Verarbeitungsschritt besteht darin, einen Pfad zu Azure Data Lake Storage Gen2 zu generieren, wo Sie Rohdokumente speichern werden:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Metadatenerfassung in einem Delta Lake

Metadaten werden in eine Deltatabelle geschrieben, die Echtzeitabfragefunktionen ermöglicht. Schreibvorgänge werden in einem Puffer gestreamt, und Abfragen an die Tabelle können Ergebnisse aus dem Puffer mit denen aus dem Verlaufsteil der Tabelle zusammenführen.

Der folgende Codeausschnitt zeigt, wie Sie eine Delta-Tabelle im Metastore definieren und nach Datum partitionieren:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Beachten Sie, dass das Feld „transactionId“ numerisch ist. Typische Nachrichten, die verteilte Systeme durchlaufen, können stattdessen GUIDs verwenden, um Transaktionen eindeutig zu identifizieren. Numerische Datentypen ermöglichen jedoch auf den meisten Datenplattformen eine höhere Abfrageleistung.

Das Zuweisen eines eindeutigen Transaktionsbezeichners kann angesichts der verteilten Struktur von Clouddatenplattformen (z. B. Spark) eine Herausforderung darstellen. Ein nützlicher Ansatz besteht darin, dass ein solcher Transaktionsbezeichner auf einem Partitionsbezeichner (z. B. die Event Hubs-Partitionsnummer) und einer inkrementellen Zahl innerhalb der Partition basiert. Ein Beispiel für diesen Ansatz ist monotonically_increasing_id() in Azure Databricks.

Der folgende Codeausschnitt zeigt, wie der Stream mit Metadaten von Rohdokumenten an die Deltatabelle angefügt wird:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Beachten Sie, dass die Partitionierung beim Schreiben des Streams gemäß dem Tabellenschema verwaltet wird.

Datenerfassung in einem Data Lake

Tatsächliche Rohdokumente werden in eine geeignete Speicherleistungsstufe in Azure Data Lake Gen2 geschrieben.

Der folgende Codeausschnitt zeigt eine einfache Funktion zum Hochladen einer Datei in Azure Data Lake Store Gen2. Mithilfe einer foreach-Methode in der DataStreamWriter-Klasse können Sie die Datei hochladen, die in jedem Datensatz des Streamingdatenrahmens gehostet wird:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Client

Der Client kann eine benutzerdefinierte Webanwendung sein, die Metadaten verwendet, um Dokumentpfade aus der Deltatabelle mit Standard-SQL-Anweisungen und wiederum das tatsächliche Dokument aus dem Data Lake mit Azure Data Lake Storage Gen2-Standard-APIs abzurufen.

Der folgende Codeausschnitt zeigt beispielsweise, wie die Pfade aller Dokumente in einer bestimmten Transaktion abgerufen werden:

select * from metadata where transactionId = '123456'

Nächste Schritte

Weitere Informationen finden Sie im zugehörigen Architekturleitfaden:

Siehe die verwandten Architekturen: