Upravit

Sdílet prostřednictvím


Vytvoření delta jezera pro podporu ad hoc dotazů v online volném čase a rezervaci cesty

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Tato architektura poskytuje příklad delta jezera pro rezervaci cest, kde se generují velké objemy nezpracovaných dokumentů s vysokou frekvencí.

Apache® a Apache Spark™ jsou registrované ochranné známky nebo ochranné známky Apache Software Foundation v USA a/nebo v jiných zemích. Použití těchto značek nevyžaduje žádné doporučení Apache Software Foundation.

Architektura

Diagram architektury Delta Lake

Stáhněte si soubor aplikace Visio s touto architekturou.

Scénáře pro volnočasové a cestovní rezervace můžou generovat velké množství nezpracovaných dokumentů s vysokou frekvencí. Možná ale nebudete muset indexovat celý obsah těchto dokumentů. Uživatelé mohou například potřebovat hledat podle známého ID transakce nebo podle jména zákazníka v určitém datu, aby mohli načíst sadu dokumentů, které jsou pro ně zajímavé.

Tok dat

Koncept této architektury se skládá z oddělení metadat, která jsou užitečná pro vyhledávání od holých dat:

  • V rámci dotazovatelné služby (například Sparku) se indexují pouze metadata, zatímco skutečná data jsou uložená v datovém jezeře.
  • Nezpracované dokumenty v datovém jezeře jsou propojeny s indexovanými metadaty jejich cestou.
  • Při dotazování na dokumenty služba prohledá metadata dokumentů a skutečné dokumenty se načtou z datového jezera jejich cestou.

Toto řešení výrazně snižuje náklady a zvyšuje výkon, protože metadata tvoří zlomek celého datového majetku (například petabajty nezpracovaných dokumentů je možné popsat desítkami gigabajtů stručných metadat).

Kromě toho je správa kombinování historických hloubkových požadavků a požadavků v reálném čase do jednotného, snadno udržovatelného a vysoce výkonného systému typickým problémem tohoto typu scénáře. Tato výzva odpovídá architektuře Delta Lake .

Komponenty

Aplikace Azure Service je platforma jako služba (PaaS) pro vytváření a hostování aplikací ve spravovaných virtuálních počítačích. App Service spravuje základní výpočetní infrastrukturu, na které vaše aplikace běží, a poskytuje monitorování kvót využití prostředků a metrik aplikací, protokolování diagnostických informací a upozornění na základě metrik.

Azure Data Factory je služba extrakce, transformace a načítání (ETL) Azure pro integraci a transformaci dat bez serveru na více instancí. Nabízí uživatelské rozhraní bez kódu pro intuitivní vytváření a monitorování a správu prostřednictvím jednoho podokna. Stávající balíčky SSIS (SQL Server Integration Services) můžete také přesunout do Azure a spouštět je s plnou kompatibilitou ve službě Azure Data Factory.

Azure Data Lake Storage Gen2 je sada funkcí vyhrazených pro analýzy velkých objemů dat, která je založená na azure Blob Storage. Data Lake Storage Gen2 konverguje možnosti Azure Data Lake Storage Gen1 se službou Azure Blob Storage. Například Data Lake Storage Gen2 poskytuje sémantiku systému souborů, zabezpečení na úrovni souborů a škálování. Vzhledem k tomu, že tyto funkce jsou založené na službě Blob Storage, získáte také nízkonákladové vrstvené úložiště s možnostmi vysoké dostupnosti nebo zotavení po havárii.

Azure Event Hubs je plně spravovaná služba pro příjem dat v reálném čase, která je jednoduchá, důvěryhodná a škálovatelná. Streamujte miliony událostí za sekundu z jakéhokoli zdroje, abyste mohli vytvářet dynamické datové kanály a okamžitě reagovat na obchodní výzvy.

Azure Databricks je platforma pro analýzu dat založená na Apache Sparku optimalizovaná pro cloudové služby Microsoft Azure. Azure Databricks nabízí tři prostředí pro vývoj aplikací náročných na data: Databricks SQL, Databricks Datová Věda & Engineering a Databricks Machine Learning.

Alternativy

Jako alternativu k indexování metadat můžete indexovat všechna nezpracovaná data ve službě, která nabízí možnosti dotazů, jako jsou Azure Databricks, Azure Synapse Analytics, Azure Cognitive Search nebo Azure Data Explorer. Tento přístup je okamžitější, ale věnujte pozornost kombinovanému efektu velikosti dat, požadavkům na výkon a frekvenci aktualizací, zejména z hlediska nákladů.

Na rozdíl od použití delta lake udržuje použití architektury Lambda data v reálném čase v jiném úložišti než historická data a klient spustí logiku, aby heterogenní dotazy byly pro uživatele transparentní. Výhodou tohoto řešení je větší sada služeb, které můžete používat (například Azure Stream Analytics a Azure SQL Database), ale architektura se stává složitější a základ kódu je dražší pro údržbu.

Spark je distribuovaný pomocí Azure Databricks, Azure Synapse Analytics a Azure HDInsight. Tuto architekturu je proto možné implementovat s některou z těchto datových služeb Azure, nejlépe s nedávnou verzí Sparku podporující Delta Lake 0.8 nebo 1.0.

Podrobnosti scénáře

Viditelnost nezpracovaných dat ve scénářích pro volný čas a cestovní rezervace je pro více subjektů důležitá. Týmy technické podpory dohlížejí na diagnostiku v reálném čase, aby nepřetržitě monitorovaly zpracování transakcí a rychle reagovaly na nežádoucí problémy. Datoví inženýři dohlížejí na export dat pro kontrolu zúčastněných stran a za účelem podávání analýz v reálném čase. Týmy zákaznické podpory vyžadují historická a nedávná data pro zpracování dotazů zákazníků a stížností. A konečně právní týmy zajišťují dodržování předpisů a provádějí právní akce. Tyto typy požadavků jsou typické v marketplace, které agregují externí poskytovatele a spravují nákupy uživatelů. Například systémy pro volný čas a cestovní rezervace oddělují uživatele a poskytovatele služeb pro vyhledávání služeb, agregují smysluplné nabídky od poskytovatelů a spravují rezervace uživatelů.

Diagram marketplace s poskytovateli služeb a uživateli B2B a B2C

Potenciální případy použití

Tato architektura je ideální pro cestovní a pohostinské odvětví. Platí pro následující scénáře:

  • Rychlé načítání nezpracovaných dokumentů v reálném čase (například pro diagnostiku) nebo historických (pro dodržování předpisů) v původním formátu
  • Správa petabajtů dat
  • Zajištění výkonu v řádu sekund pro diagnostiku v reálném čase
  • Dosažení jednotného přístupu k diagnostice v reálném čase, historickým dotazům a analýzám krmení
  • Podávání podřízených analýz v reálném čase.
  • Řízení nákladů
  • Insourcing dat jako nezpracované dokumenty (například jako json, xml nebo soubory CSV).
  • Pokud k popisu dotazů stačí zlomek dat.
  • Když uživatelé chtějí načíst úplné nezpracované dokumenty.
  • Pokud by celková velikost dat vyžadovala škálování systému nad cílovou cenu.

Tato architektura nemusí být vhodná v těchto případech:

  • Data jsou insourcovaná jako sady záznamů.
  • Uživatelé musí spouštět analýzy.
  • Uživatelé jsou ochotni používat vlastní zabalený nástroj BI.
  • Velikost dat není z hlediska nákladů výzvou.

Nezpracované dokumenty se nemusí nutně vyžadovat.

Důležité informace

Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které je možné použít ke zlepšení kvality úlohy. Další informace naleznete v tématu Microsoft Azure Well-Architected Framework.

Efektivita výkonu

Efektivita výkonu je schopnost úlohy škálovat se tak, aby efektivním způsobem splňovala požadavky, které na ni kladou uživatelé. Další informace najdete v tématu Přehled pilíře efektivity výkonu.

Uživatelé budou pro přístup k datům provádět dvojité směrování. Nejprve se dotazují na metadata a pak načtou požadovanou sadu dokumentů. Může být obtížné znovu použít existující nebo zabalené klientské prostředky.

Azure Data Lake Storage Gen2 poskytuje tři úrovně přístupu: horkou, studenou a archivní. Ve scénářích, kdy se dokumenty občas načítají, by studená úroveň výkonu měla zaručit podobný výkon jako horká úroveň výkonu, ale s výhodou nižších nákladů. Ve scénářích, kdy je pravděpodobnost načtení vyšší s novějšími daty, zvažte kombinaci studených a horkých úrovní. Použití úložiště archivní vrstvy může také poskytnout alternativu k pevnému odstranění a také snížit velikost dat tím, že zachová jenom smysluplné informace nebo více agregovaných dat.

Datové jezero bude potenciálně spravovat petabajty dat, takže obecně platí zásady uchovávání dat. Řešení zásad správného řízení dat by se měla použít ke správě životního cyklu dat, například při přesouvání starých dat mezi horkou a studenou úrovní úložiště, kdy odstranit nebo archivovat stará data a kdy agregovat informace do řešení pro podřízenou analýzu.

Zvažte, jak tento přístup může fungovat se scénáři podřízené analýzy. I když tato ukázková úloha není určená pro analýzy, je vhodná pro podávání podřízených analýz v reálném čase, zatímco dávkové scénáře by se mohly místo toho dodávat z datového jezera.

Škálovatelnost

Azure Event Hubs je velmi všestranná, pokud jde o oddělení transakčního systému, který generuje nezpracované dokumenty ze systému diagnostiky a dodržování předpisů; je snadné implementovat v již zavedených architekturách; a nakonec je snadné ho používat. Transakční systém však již může ke zpracování příchozích dokumentů použít model streamování. V takovém případě byste pravděpodobně museli integrovat logiku pro správu diagnostiky a dodržování předpisů do streamovací aplikace jako podstream.

DevOps

Pro automatické nasazení používaných služeb v této ukázkové úloze je nejlepší použít procesy kontinuální integrace a průběžného nasazování (CI/CD). Zvažte použití řešení, jako je Azure DevOps nebo GitHub Actions.

Optimalizace nákladů

Optimalizace nákladů se zabývá způsoby, jak snížit zbytečné výdaje a zlepšit efektivitu provozu. Další informace najdete v tématu Přehled pilíře optimalizace nákladů.

K odhadu nákladů použijte cenovou kalkulačku Azure. Další informace o dalších aspektech najdete v části Náklady v architektuře Microsoft Azure.

Nasazení tohoto scénáře

V následující ukázkové architektuře předpokládáme, že jeden nebo více oborů názvů služby Azure Event Hubs bude obsahovat strukturované nezpracované dokumenty (například soubory JSON nebo XML). Skutečný typ a formát dokumentů a zdrojových služeb a jejich typ integrace je však vysoce závislý na konkrétním scénáři a architektuře.

Streamování

Se strukturovaným streamováním Sparku se natahují, dekomprimují, parsují a překládají na tabulková data v datovém rámci streamování.

Následující fragment kódu PySpark se používá k načtení streamovaného datového rámce ze služby Event Hubs:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

Následující fragment kódu se používá ke zpracování streamovaného datového rámce. Nejprve dekomprimuje zprávu Event Hubs v případě potřeby a pak parsuje její strukturu JSON do tabulkového formátu. Tento kód je příkladem a měl by se přizpůsobit vašemu konkrétnímu scénáři:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

Skutečné zpracování dat se skládá ze dvou kroků. První je extrahovat metadata, která pomáhají prohledávat nezpracované dokumenty po zpracování. Skutečná metadata závisí na případu použití, ale generalizovatelné příklady by byly relevantní kalendářní data a identifikátory, typy dokumentů, zdrojová služba a jakýkoli typ kategorie:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

Druhým krokem zpracování je vygenerování cesty ke službě Azure Data Lake Storage Gen2, ve které budete ukládat nezpracované dokumenty:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Příjem metadat v delta lake

Metadata se zapisují do tabulky Delta, která umožňuje možnosti dotazů v reálném čase. Zápisy se streamují do vyrovnávací paměti a dotazy do tabulky můžou sloučit výsledky z vyrovnávací paměti s těmi z historické části tabulky.

Následující fragment kódu ukazuje, jak definovat rozdílovou tabulku v metastoru a rozdělit ji podle data:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Všimněte si, že pole TransactionId je číselné. Typické zprávy předávané distribuovanými systémy můžou místo toho používat identifikátory GUID k jednoznačné identifikaci transakcí. Číselné datové typy ale umožňují větší výkon dotazů ve většině datových platforem.

Přiřazení jedinečného identifikátoru transakce může být náročné vzhledem k distribuované povaze cloudových datových platforem (například Spark). Užitečným přístupem je založit takový identifikátor transakce na identifikátoru oddílu (jako je číslo oddílu Event Hubs) a přírůstkové číslo v rámci oddílu. Příkladem tohoto přístupu je monotonically_increasing_id() v Azure Databricks.

Následující fragment kódu ukazuje, jak k tabulce Delta připojit stream s metadaty nezpracovaných dokumentů:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Všimněte si, že dělení se spravuje při zápisu datového proudu podle schématu tabulky.

Příjem dat v datovém jezeře

Skutečné nezpracované dokumenty se zapisují do odpovídající úrovně výkonu úložiště v Azure Data Lake Gen2.

Následující fragment kódu ukazuje jednoduchou funkci pro nahrání souboru do Azure Data Lake Store Gen2; Pomocí metody foreach ve DataStreamWriter třídě můžete nahrát soubor hostovaný v každém záznamu streamovaného datového rámce:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Klient

Klientem může být vlastní webová aplikace, která používá metadata k načtení cest k dokumentům z tabulky Delta se standardními příkazy SQL a naopak skutečným dokumentem z datového jezera se standardními rozhraními API Azure Data Lake Storage Gen2.

Následující fragment kódu například ukazuje, jak načíst cesty všech dokumentů v určité transakci:

select * from metadata where transactionId = '123456'

Další kroky

Projděte si související doprovodné materiály k architektuře:

Projděte si tyto související architektury: