Questa architettura fornisce un esempio di delta lake per la prenotazione di viaggi, in cui vengono generati grandi quantità di documenti non elaborati a una frequenza elevata.
Apache e Apache® Spark™ sono marchi registrati o marchi di Apache Software Foundation nei Stati Uniti e/o in altri paesi. L'uso di questi marchi non implica alcuna approvazione da parte di Apache Software Foundation.
Architettura
Scaricare un file di Visio di questa architettura.
Gli scenari di prenotazione del tempo libero e di viaggio possono generare grandi quantità di documenti non elaborati ad alta frequenza. Tuttavia, potrebbe non essere necessario indicizzare l'intero contenuto di questi documenti. Ad esempio, gli utenti potrebbero dover cercare in base a un ID transazione noto o a un nome cliente in una determinata data, per recuperare un set di documenti interessanti.
Flusso di dati
Il concetto alla base di questa architettura consiste nel disaccoppiare i metadati utili per la ricerca da dati bare:
- Solo i metadati vengono indicizzati all'interno di un servizio su cui è possibile eseguire query (ad esempio Spark), mentre i dati effettivi vengono archiviati in un data lake.
- I documenti non elaborati in un data lake sono collegati ai metadati indicizzati in base al percorso.
- Quando si eseguono query per i documenti, il servizio cerca i metadati dei documenti e a sua volta i documenti effettivi verranno recuperati dal data lake in base al percorso.
Questa soluzione riduce notevolmente i costi e aumenta le prestazioni, poiché i metadati costituiscono una frazione dell'intero patrimonio di dati (ad esempio, petabyte di documenti non elaborati possono essere descritti da decine di gigabyte di metadati concisi).
Inoltre, la gestione della fusione di profondità cronologica e requisiti in tempo reale in un sistema uniforme, facile da gestire e ad alte prestazioni è una sfida tipica di questo tipo di scenario. L'architettura Delta Lake risponde a questa sfida.
Componenti
app Azure Service è una piattaforma distribuita come servizio (PaaS) per la compilazione e l'hosting di app in macchine virtuali gestite. servizio app gestisce l'infrastruttura di calcolo sottostante in cui vengono eseguite le app e fornisce il monitoraggio delle quote di utilizzo delle risorse e delle metriche delle app, la registrazione delle informazioni di diagnostica e gli avvisi in base alle metriche.
Azure Data Factory è il servizio di estrazione, trasformazione e caricamento del cloud di Azure per l'integrazione e la trasformazione dei dati serverless con scalabilità orizzontale. Offre un'interfaccia utente senza codice per la creazione intuitiva di contenuti e una singola console per il monitoraggio e la gestione. È anche possibile trasferire in modalità lift-and-shift i pacchetti di SQL Server Integration Services (SSIS) esistenti in Azure ed eseguirli con compatibilità completa in Azure Data Factory.
Azure Data Lake Storage Gen2 è un set di funzionalità dedicate all'analisi dei Big Data, basate su Archiviazione BLOB di Azure. Data Lake Storage Gen2 converge le funzionalità di Azure Data Lake Storage Gen1 con Archiviazione BLOB di Azure. Offre ad esempio semantica dei file system, sicurezza a livello di file e scalabilità. Poiché queste funzionalità sono basate sull'archiviazione BLOB, si ottiene anche un'archiviazione a livelli a basso costo, con funzionalità di disponibilità elevata/ripristino di emergenza.
Hub eventi di Azure è un servizio di inserimento dati completamente gestito e in tempo reale semplice, attendibile e scalabile. Consente di trasmettere in streaming milioni di eventi al secondo da qualsiasi origine per creare pipeline di dati dinamiche e rispondere immediatamente alle sfide aziendali.
Azure Databricks è una piattaforma di analisi dei dati basata su Apache Spark ottimizzata per Servizi cloud di Microsoft Azure. Azure Databricks offre tre ambienti per lo sviluppo di applicazioni a elevato utilizzo di dati: Databricks SQL, Databricks Data Science & Engineering e Databricks Machine Learning.
Alternative
In alternativa all'indicizzazione solo dei metadati, è possibile indicizzare tutti i dati non elaborati in un servizio che offre funzionalità di query, ad esempio Azure Databricks, Azure Synapse Analytics, Ricerca cognitiva di Azure o Azure Esplora dati. Questo approccio è più immediato, ma prestare attenzione all'effetto combinato delle dimensioni dei dati, dei requisiti di prestazioni e della frequenza di aggiornamento, soprattutto dal punto di vista dei costi.
Contrariamente all'uso di un delta lake, l'uso di un'architettura Lambda mantiene i dati in tempo reale in un repository diverso rispetto ai dati cronologici e il client esegue la logica per rendere trasparenti le query eterogenee per l'utente. Il vantaggio di questa soluzione è il set più ampio di servizi che è possibile usare (ad esempio Analisi di flusso di Azure e database SQL di Azure), ma l'architettura diventa più complessa e la codebase più costosa da gestire.
Spark viene distribuito con Azure Databricks, Azure Synapse Analytics e Azure HDInsight. Di conseguenza, questa architettura può essere implementata con uno di questi servizi dati di Azure, preferibilmente con una versione di Spark recente che supporta Delta Lake 0.8 o 1.0.
Dettagli dello scenario
La visibilità dei dati non elaborati negli scenari di prenotazione del tempo libero e del viaggio è importante per più attori. I team di supporto tecnico supervisionano la diagnostica in tempo reale per monitorare continuamente l'elaborazione delle transazioni e reagire rapidamente a problemi indesiderati. I data engineer sorvegliano l'esportazione dei dati per la revisione degli stakeholder e il feed di analisi in tempo reale. I team di supporto clienti richiedono dati cronologici e recenti per gestire richieste e reclami dei clienti. Infine, i team legali garantiscono il rispetto dei compiti di conformità e le azioni legali eseguite. Questi tipi di requisiti sono tipici nei marketplace che aggregano provider esterni e gestiscono gli acquisti degli utenti. Ad esempio, i sistemi di prenotazione del tempo libero e di viaggio disintermediano gli utenti e i provider di servizi per la ricerca di servizi, l'aggregazione di offerte significative dai provider e la gestione delle prenotazioni degli utenti.
Potenziali casi d'uso
Questa architettura è ideale per le industrie di viaggi e ospitalità. È applicabile agli scenari seguenti:
- Recupero rapido di documenti non elaborati in tempo reale (ad esempio, per la diagnostica) o cronologici (per la conformità) nel formato originale.
- Gestione di petabyte di dati.
- Garanzia delle prestazioni a intervalli di secondi per la diagnostica in tempo reale.
- Ottenere un approccio unificato alla diagnostica in tempo reale, alle query cronologiche e all'analisi di alimentazione.
- Alimentazione di analisi in tempo reale downstream.
- Controllo dei costi.
- Insourcing dei dati come documenti non elaborati (ad esempio, come file json, xml o csv).
- Quando una frazione di dati è sufficiente per descrivere le query.
- Quando gli utenti vogliono recuperare documenti non elaborati completi.
- Quando le dimensioni totali dei dati richiedono il ridimensionamento del sistema al di sopra del prezzo di destinazione.
Questa architettura potrebbe non essere adatta quando:
- I dati vengono inseriti come recordset.
- Gli utenti devono eseguire l'analisi.
- Gli utenti sono disposti a usare il proprio strumento bi in pacchetto.
- Le dimensioni dei dati non sono una sfida dal punto di vista dei costi.
I documenti non elaborati non sono necessariamente necessari.
Considerazioni
Queste considerazioni implementano i pilastri di Azure Well-Architected Framework, che è un set di set di principi guida che possono essere usati per migliorare la qualità di un carico di lavoro. Per altre informazioni, vedere Framework ben progettato di Microsoft Azure.
Efficienza prestazionale
L'efficienza delle prestazioni è la capacità di dimensionare il carico di lavoro per soddisfare in modo efficiente le richieste poste dagli utenti. Per altre informazioni, vedere Panoramica dell'efficienza delle prestazioni.
Gli utenti eseguiranno un doppio hop per accedere ai dati. Eseguiranno prima una query sui metadati e quindi recupereranno il set di documenti desiderato. Potrebbe essere difficile riutilizzare gli asset client esistenti o in pacchetto.
Azure Data Lake Storage Gen2 offre tre livelli di accesso: accesso frequente, sporadico e archivio. Negli scenari in cui i documenti vengono occasionalmente recuperati, il livello di prestazioni ad accesso sporadico deve garantire prestazioni simili al livello di prestazioni ad accesso frequente, ma con il vantaggio di costi inferiori. Negli scenari in cui la probabilità di recupero è più elevata con i dati più recenti, è consigliabile combinare i livelli ad accesso sporadico e frequente. L'uso dell'archiviazione a livelli di archiviazione può anche offrire un'alternativa all'eliminazione rigida, oltre a ridurre le dimensioni dei dati mantenendo solo informazioni significative o più dati aggregati.
Il data lake gestirà potenzialmente petabyte di dati, quindi in genere si applicano i criteri di conservazione dei dati. Le soluzioni di governance dei dati devono essere usate per gestire il ciclo di vita dei dati, ad esempio quando spostare i dati obsoleti tra livelli di archiviazione ad accesso frequente e sporadico, quando eliminare o archiviare dati obsoleti e quando aggregare le informazioni in una soluzione di analisi downstream.
Valutare il modo in cui questo approccio può funzionare con gli scenari di analisi downstream. Anche se questo carico di lavoro di esempio non è destinato all'analisi, è appropriato per l'alimentazione di analisi in tempo reale downstream, mentre gli scenari batch possono essere inseriti dal data lake.
Scalabilità
Hub eventi di Azure è estremamente versatile quando si tratta di disaccoppiare un sistema transazionale che genera documenti non elaborati da un sistema di diagnostica e conformità, è facile da implementare in architetture già consolidate e, in definitiva, è facile da usare. Tuttavia, il sistema transazionale potrebbe già usare il modello di streaming per elaborare i documenti in ingresso. In tal caso, è probabile che sia necessario integrare la logica per la gestione della diagnostica e della conformità nell'applicazione di streaming come sottostream.
DevOps
Per distribuire automaticamente i servizi usati in questo carico di lavoro di esempio, è consigliabile usare processi di integrazione continua e distribuzione continua (CI/CD). Prendere in considerazione l'uso di una soluzione come Azure DevOps o GitHub Actions.
Ottimizzazione dei costi
L'ottimizzazione dei costi riguarda l'analisi dei modi per ridurre le spese non necessarie e migliorare l'efficienza operativa. Per altre informazioni, vedere Panoramica del pilastro di ottimizzazione dei costi.
In linea generale, usare il calcolatore dei prezzi di Azure per stimare i costi. Per altre considerazioni, vedere la sezione relativa ai costi in Microsoft Azure Well-Architected Framework .
Distribuire lo scenario
Nell'architettura di esempio seguente si presuppone che uno o più spazi dei nomi Hub eventi di Azure contengano documenti non elaborati strutturati, ad esempio file JSON o XML. Tuttavia, il tipo e il formato effettivi di documenti e servizi di origine e il relativo tipo di integrazione dipendono fortemente dallo scenario e dall'architettura specifici.
Streaming
Con Spark Structured Streaming, i dati non elaborati vengono estratti, decompressi, analizzati e convertiti in dati tabulari in un dataframe di streaming.
Il frammento di codice PySpark seguente viene usato per caricare un dataframe di streaming da Hub eventi:
# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] =
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup
streaming_df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
Il frammento di codice seguente viene usato per elaborare il dataframe di streaming. Decomprime innanzitutto il messaggio di Hub eventi, se necessario, e quindi analizza la struttura json in un formato tabulare. Questo codice è un esempio e deve essere adattato allo scenario specifico:
# Code tested in Databricks with Delta Lake 1.0
# defines an UDF to unzip the Event Hubs Body field, assuming it
is gzipped
import zlib
def DecompressFunction(data):
decoded_data = zlib.decompress(bytes(data), 15+32)
return decoded_data.decode()
Decompress = udf(lambda body: DecompressFunction(body),
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody",
Decompress(col("body"))).select("DecodedBody")
# Parse json message from Event Hubs body, assuming the raw
document is stored in the data field, and the others fields hold
some metadata about it
schema = StructType([ \
StructField("transactionId", LongType(),True), \
StructField("timestamp",TimestampType(),True), \
StructField("providerName", StringType(),True), \
StructField("document", StringType(),True), \
StructField("documentType", StringType(),True)
])
parsed_body_df = decoded_body_df.withColumn("jsonBody",
from_json(col("DecodedBody"), schema)).select("jsonBody")
L'elaborazione effettiva dei dati è costituita da due passaggi. Il primo consiste nell'estrarre i metadati per facilitare la ricerca dei documenti non elaborati dopo l'elaborazione. I metadati effettivi dipendono dal caso d'uso, ma gli esempi generalizzabili sono date e identificatori rilevanti, tipi di documento, servizio di origine e qualsiasi tipo di categoria:
# Code tested in Databricks with Delta Lake 1.0
df = parsed_body_df \
.withColumn("transactionId",
parsed_body_df.jsonBody.transactionId) \
.withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
.withColumn("providerName",
parsed_body_df.jsonBody.providerName) \
.withColumn("data", parsed_body_df.jsonBody.data)
.withColumn("documentType",
parsed_body_df.jsonBody.documentType)
Il secondo passaggio di elaborazione consiste nel generare un percorso ad Azure Data Lake Storage Gen2, in cui verranno archiviati documenti non elaborati:
# Code tested in Databricks with Delta Lake 1.0
# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName,
Suffix='', Extension=".gz"):
yy = timeStamp.year
mm = timeStamp.month
dd = timeStamp.day
hh = timeStamp.hour
mn = timeStamp.minute
Suffix = f"{Suffix}_" if Suffix != '' else ''
Name = f"{Suffix}{providerName}{Extension}"
path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
return path
GetPath = udf(lambda timestamp, transactionId, providerName,
suffix, extension: GetPathFunction(timestamp, transactionId,
providerName, suffix, extension), StringType())
df = df.withColumn("path", GetPath(col("timestamp"),
col("transactionId"), col("providerName"), col('documentType')))
Inserimento di metadati in un delta lake
I metadati vengono scritti in una tabella differenziale che consente funzionalità di query in tempo reale. Le scritture vengono trasmessi in un buffer e le query nella tabella possono unire i risultati dal buffer con quelli della parte cronologica della tabella.
Il frammento di codice seguente illustra come definire una tabella delta nel metastore e partizionarla per data:
# Code tested in Databricks with Delta Lake 1.0
DeltaTable.create(spark) \
.tableName("metadata") \
.addColumn("transactionId", LongType()) \
.addColumn("date", TimestampType()) \
.addColumn("providerName", StringType()) \
.addColumn("documentType", StringType()) \
.addColumn("path", StringType()) \
.partitionedBy("date") \
.execute()
Si noti che il campo transactionId è numerico. I messaggi tipici che passano sistemi distribuiti possono usare GUID per identificare in modo univoco le transazioni. Tuttavia, i tipi di dati numerici consentono prestazioni di query maggiori nella maggior parte delle piattaforme dati.
L'assegnazione di un identificatore di transazione univoco potrebbe risultare complessa in base alla natura distribuita delle piattaforme dati cloud( ad esempio Spark). Un approccio utile consiste nel basare tale identificatore di transazione su un identificatore di partizione (ad esempio il numero di partizione di Hub eventi) e un numero incrementale all'interno della partizione. Un esempio di questo approccio è monotonically_increasing_id() in Azure Databricks.
Il frammento di codice seguente illustra come aggiungere il flusso con i metadati dei documenti non elaborati alla tabella delta:
# Code tested in Databricks with Delta Lake 1.0
df.withColumn("date", col("timeStamp").cast(DateType())) \
.select("transactionId", "date", "providerName",
"documentType", "path") \
.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation",
"/delta/metadata/_checkpoints/metadata_checkpoint") \
.table("metadata")
Si noti che il partizionamento viene gestito durante la scrittura del flusso in base allo schema della tabella.
Inserimento di dati in un data lake
I documenti non elaborati effettivi vengono scritti in un livello di prestazioni di archiviazione appropriato in Azure Data Lake Gen2.
Il frammento di codice seguente illustra una semplice funzione per caricare un file in Azure Data Lake Store Gen2; l'uso di un metodo foreach nella DataStreamWriter
classe consente di caricare il file ospitato in ogni record del dataframe di streaming:
# Code tested in Databricks with Delta Lake 1.0
from azure.storage.filedatalake import DataLakeServiceClient
def upload_data(storage_account_name, storage_account_key,
file_system_name, file_path, data):
service_client =
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name),
credential=storage_account_key)
file_system_client =
service_client.get_file_system_client(file_system_name)
file_client =
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
if not file_client.exists:
file_client.create_file()
file_client.upload_data(data, overwrite=True)
# Process a row to upload data to ADLS
def Row2ADLS(row):
upload_data(adls_name, adls_key, adls_container, row['path'],
row['data'])
df.writeStream.foreach(Row2ADLS).start()
Client
Il client può essere un'applicazione Web personalizzata che usa i metadati per recuperare i percorsi dei documenti dalla tabella delta con istruzioni SQL standard e, a sua volta, il documento effettivo dal data lake con API standard di Azure Data Lake Storage Gen2.
Il frammento di codice seguente, ad esempio, mostra come recuperare i percorsi di tutti i documenti in una determinata transazione:
select * from metadata where transactionId = '123456'
Passaggi successivi
- Delta Lake
- Delta Lake in Azure Synapse Analytics
- Delta Lake in Azure Databricks
- Spark Structured Streaming
- Azure Synapse Analytics
- Informazioni su Delta Lake in Azure Synapse Analytics
- Guida di Azure Databricks Delta Lake e Delta Engine
- Descrivere l'architettura delta Lake di Azure Databricks
Risorse correlate
Vedere le linee guida relative all'architettura:
Vedere queste architetture correlate: