Skapa en deltasjö för att stödja ad hoc-frågor i online-fritids- och resebokningar

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Den här arkitekturen innehåller ett exempel på delta lake för resebokning, där stora mängder rådata genereras med hög frekvens.

Apache® och Apache Spark™ är antingen registrerade varumärken eller varumärken som tillhör Apache Software Foundation i USA och/eller andra länder. Inget godkännande från Apache Software Foundation underförstås av användningen av dessa märken.

Arkitektur

Diagram över Delta Lake-arkitektur.

Ladda ned en Visio-fil med den här arkitekturen.

Scenarier för fritids- och resebokningar kan generera stora mängder rådata med hög frekvens. Du kanske dock inte behöver indexering av hela innehållet i dessa dokument. Användare kan till exempel behöva söka efter ett känt transaktions-ID, eller med ett kundnamn vid ett visst datum, för att hämta en uppsättning dokument som är intressanta för dem.

Dataflöde

Konceptet bakom den här arkitekturen består i att avkoda metadata som är användbara för sökning från data utan data:

  • Endast metadata indexeras inom en frågebar tjänst (till exempel Spark), medan faktiska data lagras i en datasjö.
  • Rådatadokument i en datasjö är länkade till indexerade metadata via deras sökväg.
  • När du frågar efter dokument söker tjänsten igenom dokumentens metadata, och i sin tur hämtas de faktiska dokumenten från datasjön efter deras sökväg.

Den här lösningen sänker kostnaderna avsevärt och ökar prestandan, eftersom metadata utgör en bråkdel av hela dataegendomen (till exempel kan petabyte rådata beskrivas med tiotals gigabyte koncisa metadata).

Dessutom är det en typisk utmaning för den här typen av scenario att hantera blandning av historiska djup- och realtidskrav i ett enhetligt, lättbehållet och högpresterande system. Delta Lake-arkitekturen svarar på den här utmaningen.

Komponenter

Azure App Service är en plattform som en tjänst (PaaS) för att skapa och vara värd för appar på hanterade virtuella datorer. App Service hanterar den underliggande beräkningsinfrastruktur som dina appar körs på och tillhandahåller övervakning av resursanvändningskvoter och appmått, loggning av diagnostikinformation och aviseringar baserat på mått.

Azure Data Factory är Azures ETL-tjänst (cloud extract, transform, and load) för skalbar serverlös dataintegrering och datatransformering. Tjänsten har ett kodlöst användargränssnitt för intuitiv redigering och enkel övervakning och hantering. Du kan också lyfta och flytta befintliga SSIS-paket (SQL Server Integration Services) till Azure och köra dem med fullständig kompatibilitet i Azure Data Factory.

Azure Data Lake Storage Gen2 är en uppsättning funktioner som är dedikerade till stordataanalys som bygger på Azure Blob Storage. Data Lake Storage Gen2 konvergerar funktionerna i Azure Data Lake Storage Gen1 med Azure Blob Storage. Data Lake Storage Gen2 tillhandahåller till exempel filsystemssemantik, säkerhet på filnivå och skalning. Eftersom dessa funktioner bygger på Blob Storage får du även låg kostnad, nivåindelad lagring med hög tillgänglighet/haveriberedskapsfunktioner.

Azure Event Hubs är en fullständigt hanterad datainmatningstjänst i realtid som är enkel, betrodd och skalbar. Strömma miljontals händelser per sekund från valfri källa, så att du kan bygga upp dynamiska datapipelines och direkt vara redo för verksamhetsutmaningar.

Azure Databricks är en Apache Spark-baserad dataanalysplattform som är optimerad för Microsoft Azure Cloud Services. Azure Databricks erbjuder tre miljöer för att utveckla dataintensiva program: Databricks SQL, Databricks Datavetenskap & Engineering och Databricks Machine Learning.

Alternativ

Som ett alternativ till att endast indexera metadata kan du indexera alla rådata i en tjänst som erbjuder frågefunktioner, till exempel Azure Databricks, Azure Synapse Analytics, Azure Cognitive Search eller Azure Data Explorer. Den här metoden är mer omedelbar, men var uppmärksam på den kombinerade effekten av datastorlek, prestandakrav och uppdateringsfrekvens, särskilt ur ett kostnadsperspektiv.

I motsats till att använda en deltasjö behåller användning av en Lambda-arkitektur realtidsdata på en annan lagringsplats än historiska data, och klienten kör logiken för att göra heterogena frågor transparenta för användaren. Fördelen med den här lösningen är den större uppsättningen tjänster som du kan använda (till exempel Azure Stream Analytics och Azure SQL Database), men arkitekturen blir mer komplex och kodbasen dyrare att underhålla.

Spark distribueras med Azure Databricks, Azure Synapse Analytics och Azure HDInsight. Därför kan den här arkitekturen implementeras med någon av dessa Azure-datatjänster, helst med en ny Spark-version som stöder Delta Lake 0.8 eller 1.0.

Information om scenario

Synligheten för rådata i scenarier för fritids- och resebokningar är viktigt för flera aktörer. Tekniska supportteam övervakar realtidsdiagnostik för att kontinuerligt övervaka transaktionsbearbetning och snabbt reagera på oönskade problem. Datatekniker övervakar export av data för granskning av intressenter och för att mata analys i realtid. Kundsupportteam kräver historiska och senaste data för att hantera kundförfrågningar och klagomål. Slutligen ser juridiska team till att efterlevnadsuppgifterna respekteras och att rättsliga åtgärder utförs. Dessa typer av krav är typiska på marknadsplatser som aggregerar externa leverantörer och hanterar användarköp. Till exempel erbjuder bokningssystem för fritid och resor användare och tjänsteleverantörer för att söka efter tjänster, aggregera meningsfulla erbjudanden från leverantörer och hantera användarreservationer.

Diagram över en marknadsplats med tjänstleverantörer och B2B- och B2C-användare.

Potentiella användningsfall

Denna arkitektur är idealisk för rese- och gästfrihetsindustrin. Den gäller för följande scenarier:

  • Hämtar snabbt antingen realtidsdokument (till exempel för diagnostik) eller historiska (för efterlevnad) i sitt ursprungliga format.
  • Hantera petabyte med data.
  • Garantera sekunders prestanda för realtidsdiagnostik.
  • Uppnå en enhetlig metod för realtidsdiagnostik, historiska frågor och matningsanalys.
  • Mata in underordnade realtidsanalyser.
  • Kontrollera kostnader.
  • Insourcing-data som rådata (till exempel som json-, XML- eller csv-filer).
  • När en bråkdel av data räcker för att beskriva frågor.
  • När användare vill hämta fullständiga rådatadokument.
  • När den totala datastorleken skulle kräva skalning av systemet över målpriset.

Den här arkitekturen kanske inte är lämplig när:

  • Data är insourced som postuppsättningar.
  • Användare måste köra analys.
  • Användarna är villiga att använda sitt eget paketerade BI-verktyg.
  • Storleken på data är inte en utmaning ur ett kostnadsperspektiv.

Rådokument krävs inte nödvändigtvis.

Att tänka på

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som kan användas för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.

Prestandaeffektivitet

Prestandaeffektivitet handlar om att effektivt skala arbetsbelastningen baserat på användarnas behov. Mer information finns i Översikt över grundpelare för prestandaeffektivitet.

Användarna kommer att utföra ett dubbelhopp för att komma åt data. De frågar först efter metadata och hämtar sedan den önskade uppsättningen dokument. Det kan vara svårt att återanvända befintliga eller paketerade klienttillgångar.

Azure Data Lake Storage Gen2 har tre åtkomstnivåer: frekvent, lågfrekvent och arkiv. I scenarier där dokument ibland hämtas bör den lågfrekventa prestandanivån garantera liknande prestanda som den frekventa prestandanivån, men med fördelen med lägre kostnader. I scenarier där sannolikheten för hämtning är högre med nyare data bör du överväga att blanda lågfrekventa och frekventa nivåer. Lagring på arkivnivå kan också vara ett alternativ till hård borttagning, samt minska storleken på data genom att endast behålla meningsfull information eller mer aggregerade data.

Datasjön hanterar potentiellt petabyte med data, så datakvarhållningsprinciper gäller vanligtvis. Datastyrningslösningar bör användas för att hantera datalivscykeln, till exempel när gamla data ska flyttas mellan frekventa och lågfrekventa lagringsnivåer, när gamla data ska tas bort eller arkiveras och när information ska aggregeras till en nedströmsanalyslösning.

Fundera på hur den här metoden kan fungera med scenarier för nedströmsanalys. Även om den här exempelarbetsbelastningen inte är avsedd för analys är det lämpligt att mata in nedströmsanalyser i realtid, medan batchscenarier kan matas från datasjön i stället.

Skalbarhet

Azure Event Hubs är mycket mångsidigt när det gäller att avkoda ett transaktionssystem som genererar rådata från ett diagnostik- och efterlevnadssystem. är lätt att implementera i redan etablerade arkitekturer. och i slutändan är lätt att använda. Transaktionssystemet kan dock redan använda strömningsmönstret för att bearbeta inkommande dokument. I så fall skulle du förmodligen behöva integrera logik för att hantera diagnostik och efterlevnad i strömningsprogrammet som en underström.

DevOps

För att distribuera de tjänster som används i den här exempelarbetsbelastningen automatiskt är det bäst att använda ci/CD-processer (kontinuerlig integrering och kontinuerlig distribution). Överväg att använda en lösning som Azure DevOps eller GitHub Actions.

Kostnadsoptimering

Kostnadsoptimering handlar om att titta på sätt att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i Översikt över kostnadsoptimeringspelare.

Normalt beräknar du kostnader med hjälp av Azures priskalkylator. Mer information om andra överväganden finns i avsnittet om kostnader i Microsoft Azure Well-Architected Framework .

Distribuera det här scenariot

I följande exempelarkitektur förutsätter vi att en eller flera Azure Event Hubs-namnområden innehåller strukturerade rådatadokument (till exempel json- eller XML-filer). Den faktiska typen och formatet på dokument och källtjänster, och deras typ av integrering, är dock starkt beroende av det specifika scenariot och arkitekturen.

Strömning

Med Spark Structured Streaming hämtas rådata, dekomprimeras, parsas och översätts till tabelldata i en strömmande DataFrame.

Följande PySpark-kodfragment används för att läsa in en strömmande DataFrame från Event Hubs:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

Följande kodfragment används för att bearbeta strömmande DataFrame. Det dekomprimerar först Event Hubs-meddelandet om det behövs och parsar sedan dess json-struktur till ett tabellformat. Den här koden är ett exempel och bör anpassas till ditt specifika scenario:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

Faktisk databehandling består av två steg. Den första är att extrahera metadata för att hjälpa till att söka i rådatadokumenten efter bearbetningen. Faktiska metadata beror på användningsfallet, men generaliserbara exempel skulle vara relevanta datum och identifierare, dokumenttyper, källtjänst och alla typer av kategorier:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

Det andra bearbetningssteget är att generera en sökväg till Azure Data Lake Storage Gen2, där du lagrar rådatadokument:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Metadatainmatning i en deltasjö

Metadata skrivs till en deltatabell som möjliggör frågefunktioner i realtid. Skrivningar strömmas i en buffert och frågor till tabellen kan sammanfoga resultat från bufferten med dem från den historiska delen av tabellen.

Följande kodfragment visar hur du definierar en deltatabell i metaarkivet och partitioneras efter datum:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Observera att fältet transactionId är numeriskt. Vanliga meddelanden som skickar distribuerade system kan använda GUID:er för att unikt identifiera transaktioner i stället. Numeriska datatyper ger dock bättre frågeprestanda på de flesta dataplattformar.

Det kan vara svårt att tilldela en unik transaktionsidentifierare med tanke på molndataplattformarnas distribuerade karaktär (till exempel Spark). En användbar metod är att basera en sådan transaktionsidentifierare på en partitionsidentifierare (t.ex. partitionsnumret för Event Hubs) och ett inkrementellt tal inom partitionen. Ett exempel på den här metoden är monotonically_increasing_id() i Azure Databricks.

Följande kodfragment visar hur du lägger till strömmen med metadata för rådata i deltatabellen:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Observera att partitionering hanteras när strömmen skrivs enligt tabellschemat.

Datainmatning i en datasjö

Faktiska rådatadokument skrivs till en lämplig lagringsprestandanivå i Azure Data Lake Gen2.

Följande kodfragment visar en enkel funktion för att ladda upp en fil till Azure Data Lake Store Gen2. Med en foreach-metod i DataStreamWriter klassen kan du ladda upp filen som finns i varje post i strömmande DataFrame:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Klient

Klienten kan vara ett anpassat webbprogram som använder metadata för att hämta dokumentsökvägar från deltatabellen med sql-standardinstruktioner och i sin tur det faktiska dokumentet från datasjön med standard-API:er för Azure Data Lake Storage Gen2.

Följande kodfragment visar till exempel hur du hämtar sökvägarna för alla dokument i en viss transaktion:

select * from metadata where transactionId = '123456'

Nästa steg

Se den relaterade arkitekturvägledningen:

Se dessa relaterade arkitekturer: