Creación de un Delta Lake para admitir consultas ad hoc en las reservas de ocio y viajes en línea

Azure Event Hubs
Azure Data Lake Storage
Azure Databricks
Azure Synapse Analytics

Esta arquitectura proporciona un Delta Lake de ejemplo para la reserva de viajes, donde se generan grandes cantidades de documentos sin procesar con una alta frecuencia.

Apache® y Apache Spark™ son marcas comerciales registradas o marcas comerciales de Apache Software Foundation en los Estados Unidos y otros países. El uso de estas marcas no implica la aprobación de Apache Software Foundation.

Architecture

Diagrama de una arquitectura Delta Lake.

Descargue un archivo Visio de esta arquitectura.

Los escenarios de reserva de ocio y viajes pueden generar grandes cantidades de documentos sin procesar con una alta frecuencia. Sin embargo, es posible que no sea necesario indexar todo el contenido de estos documentos. Por ejemplo, es posible que los usuarios necesiten buscar por un identificador de transacción conocido, o por un nombre de cliente en una fecha determinada, para recuperar un conjunto de documentos que les interesan.

Flujo de datos

El concepto detrás de esta arquitectura consiste en desvincular los metadatos útiles para buscar datos sin sistema operativo.

  • Solo se indexan los metadatos dentro de un servicio consultable (como Spark), mientras que los datos reales se almacenan en un lago de datos.
  • Los documentos sin procesar de un lago de datos están vinculados a metadatos indexados por su ruta de acceso.
  • Al consultar documentos, el servicio busca los metadatos de los documentos y, a su vez, los documentos reales se recuperarán del lago de datos por su ruta de acceso.

Esta solución reduce drásticamente los costes y aumenta el rendimiento, ya que los metadatos comprenden una fracción de todo el patrimonio de datos (por ejemplo, se pueden describir petabytes de documentos sin procesar mediante decenas de gigabytes de metadatos concisos).

Además, la administración de la combinación de la profundidad histórica y los requisitos en tiempo real en un sistema uniforme, fácil de mantener y de alto rendimiento es un desafío típico de este tipo de escenario. La arquitectura Delta Lake supera este desafío.

Componentes

Azure App Service es un servicio de plataforma como servicio (PaaS) para crear y hospedar aplicaciones en máquinas virtuales administradas. App Service gestiona la infraestructura de proceso subyacente en la que se ejecutan las aplicaciones y proporciona supervisión de las cuotas de uso de recursos y métricas de aplicaciones, registro de información de diagnóstico y alertas basadas en métricas.

Azure Data Factory es un servicio de extracción, transformación y carga de datos (ETL) de Azure que ofrece escalabilidad horizontal, integración de datos sin servidor y transformación de datos. Ofrece una interfaz de usuario sin código que favorece la creación intuitiva y una supervisión y administración desde un único panel. También puede elevar y desplazar paquetes de SQL Server Integration Services existentes (SSIS) a Azure y ejecutarlos con compatibilidad completa en Azure Data Factory.

Azure Data Lake Storage Gen2 es un conjunto de funcionalidades dedicadas al análisis de macrodatos, que se basa en Azure Blob Storage. En Data Lake Storage Gen2 convergen las funcionalidades de Azure Data Lake Storage Gen1 con Azure Blob Storage. Por ejemplo, Data Lake Storage Gen2 proporciona la semántica del sistema de archivos, la seguridad de nivel de archivo y la escala. Debido a que estas funcionalidades se basan en Blob Storage, también disfrutará de un almacenamiento por niveles de bajo coste, con funcionalidades de alta disponibilidad y recuperación ante desastres.

Azure Event Hubs es un servicio de ingesta de datos en tiempo real y totalmente administrado simple, fiable y escalable. Transmite millones de eventos por segundo desde cualquier origen para compilar canalizaciones de datos dinámicos y responder inmediatamente a los desafíos empresariales.

Azure Databricks es una plataforma de análisis basada en Apache Spark optimizada para Microsoft Azure Cloud Services. Azure Databricks ofrece tres entornos para desarrollar aplicaciones que consumen muchos datos: Databricks SQL, Databricks Data Science & Engineering y Databricks Machine Learning.

Alternativas

Como alternativa a la indexación exclusiva de metadatos, podría indexar todos los datos sin procesar de un servicio que ofrece funcionalidades de consulta, como Azure Databricks, Azure Synapse Analytics, Azure Cognitive Search o Azure Data Explorer. Este enfoque es más inmediato, pero debe tenerse en cuenta el efecto combinado del tamaño de los datos, los requisitos de rendimiento y la frecuencia de actualización, especialmente desde una perspectiva de costes.

Al contrario que sucede con el uso de un lago delta, el uso de una arquitectura Lambda mantiene los datos en tiempo real en un repositorio diferente de los datos históricos, y el cliente ejecuta la lógica para que las consultas heterogéneas sean transparentes para el usuario. La ventaja de esta solución es el conjunto mayor de servicios que puede usar (como Azure Stream Analytics y Azure SQL Database), pero la arquitectura se vuelve más compleja y el mantenimiento de la base de código es más caro.

Spark se distribuye con Azure Databricks, Azure Synapse Analytics y Azure HDInsight. Por lo tanto, esta arquitectura se podría implementar con cualquiera de estos servicios de datos de Azure, preferiblemente con una versión reciente de Spark que admita Delta Lake 0.8 o 1.0.

Detalles del escenario

La visibilidad de los datos sin procesar en escenarios de reservas de ocio y viajes en línea es importante para varios actores. Los equipos de soporte técnico supervisan los diagnósticos en tiempo real para supervisar continuamente el procesamiento de transacciones y reaccionar rápidamente ante problemas no deseados. Los ingenieros de datos supervisan la exportación de datos para la revisión de las partes interesadas y para alimentar el análisis en tiempo real. Los equipos de asistencia al cliente requieren datos históricos y recientes para gestionar las consultas y las quejas de los clientes. Por último, los departamentos jurídicos garantizan que se respeten las obligaciones de cumplimiento y se lleven a cabo acciones legales. Estos tipos de requisitos son típicos en marketplaces que agregan proveedores externos y administran las compras de usuarios. Por ejemplo, los sistemas de reservas de viajes y desplazamientos desintermedian a los usuarios y proveedores de servicios para buscar servicios, agregar ofertas significativas de proveedores y administrar reservas de usuarios.

Diagrama de un marketplace con proveedores de servicio y usuarios B2B y B2C.

Posibles casos de uso

Esta arquitectura es ideal para los sectores de viajes y la industria hotelera. Se aplica a los escenarios siguientes:

  • Recuperación rápida de documentos sin procesar en tiempo real (por ejemplo, para diagnósticos) o históricos (para cumplimiento) en su formato original.
  • Administración de petabytes de datos.
  • Garantizar un rendimiento de intervalo de segundos para los diagnósticos en tiempo real.
  • Lograr un enfoque unificado para el diagnóstico en tiempo real, las consultas históricas y el análisis de alimentación.
  • Alimentación del análisis en tiempo real de bajada.
  • Controlar los costes.
  • Obtención de datos como documentos sin procesar (por ejemplo, como archivos json, xml o csv).
  • Cuando una fracción de datos es suficiente para describir las consultas.
  • Cuando los usuarios desean recuperar documentos sin procesar completos.
  • Cuando el tamaño total de los datos requeriría escalar el sistema por encima del precio objetivo.

Es posible que esta arquitectura no sea adecuada cuando:

  • Los datos se autoaprovisionan como conjuntos de registros.
  • Los usuarios deben ejecutar análisis.
  • Los usuarios están dispuestos a usar su propia herramienta de inteligencia empresarial integrada.
  • El tamaño de los datos no es un problema desde una perspectiva de costes.

Los documentos sin procesar no son necesarios.

Consideraciones

Estas consideraciones implementan los pilares del marco de buena arquitectura de Azure, que es un conjunto de principios guía que se pueden usar para mejorar la calidad de una carga de trabajo. Para más información, consulte Marco de buena arquitectura de Microsoft Azure.

Eficiencia del rendimiento

La eficiencia del rendimiento es la capacidad de la carga de trabajo para escalar con el fin de satisfacer de manera eficiente las demandas que los usuarios hayan ejercido sobre ella. Para obtener más información, vea Resumen del pilar de eficiencia del rendimiento.

Los usuarios realizarán un salto doble para acceder a los datos. Primero consultarán los metadatos y, a continuación, recuperarán el conjunto de documentos deseado. Puede ser difícil reutilizar recursos de cliente existentes o empaquetados.

Entre los niveles de acceso de Azure Data Lake Storage Gen2 se incluyen el nivel de acceso frecuente, esporádico y de archivo. En escenarios en los que los documentos se recuperan ocasionalmente, el nivel de rendimiento esporádico debe garantizar un rendimiento similar al nivel de rendimiento de acceso frecuente, pero con la ventaja de reducir los costes. En escenarios en los que la probabilidad de recuperación es mayor con los datos más recientes, considere la posibilidad de combinar los niveles de acceso frecuente y esporádico. El uso del almacenamiento de nivel de archivo también podría proporcionar una alternativa a la eliminación permanente, así como reducir el tamaño de los datos, manteniendo solo información significativa o más datos agregados.

El lago de datos podría administrar petabytes de datos, por lo cual suelen aplicarse las directivas de retención de datos. Deben utilizarse soluciones de gobernanza de datos para administrar el ciclo de vida de los datos, como, por ejemplo, cuándo mover datos antiguos entre niveles de almacenamiento de acceso habitual o esporádico, es decir, cuándo eliminar o archivar datos antiguos y cuándo agregar información en una solución de análisis de nivel inferior.

Considere cómo podría funcionar este enfoque con escenarios de análisis de nivel inferior. Aunque esta carga de trabajo de ejemplo no está pensada para el análisis, es adecuada para alimentar el análisis en tiempo real de nivel inferior, mientras que los escenarios por lotes podrían alimentarse desde data lake en su lugar.

Escalabilidad

Azure Event Hubs es muy versátil cuando se trata de desvincular un sistema transaccional que genera documentos sin procesar a partir de un sistema de diagnóstico y cumplimiento. Es fácil de implementar en arquitecturas ya establecidas; y, en última instancia, es fácil de usar. Sin embargo, es posible que el sistema transaccional ya utilice el patrón de streaming para procesar los documentos entrantes. En ese caso, es probable que deba integrar la lógica para administrar el diagnóstico y el cumplimiento en la aplicación de streaming como una transmisión secundaria.

DevOps

Para implementar automáticamente los servicios usados en esta carga de trabajo de ejemplo, es mejor usar procesos de integración continua e implementación continua (CI/CD). Considere el uso de una solución de tipo Azure DevOps o Acciones de GitHub.

Optimización de costos

La optimización de costos trata de buscar formas de reducir los gastos innecesarios y mejorar las eficiencias operativas. Para más información, vea Información general del pilar de optimización de costos.

En general, use la calculadora de precios de Azure para calcular los costos. Consulte la sección de costes del Marco de buena arquitectura de Microsoft Azure para obtener información sobre otras consideraciones.

Implementación de este escenario

En la arquitectura de ejemplo siguiente, se supone que uno o más espacios de nombres Azure Event Hubs contendrán documentos sin formato estructurados (tales como archivos json o xml). Sin embargo, el tipo y el formato reales de los documentos y los servicios de origen, así como su tipo de integración, dependen en gran medida del escenario y la arquitectura específicos.

Streaming

Con Spark Structured Streaming, los datos sin procesar se extraen, descomprimen, analizan y traducen a datos tabulares en un dataframe de streaming.

El siguiente fragmento de código de PySpark se utiliza para cargar un dataframe de streaming desde Event Hubs:

# Code tested in Databricks with Delta Lake 1.0
eh_connstr = <your_conn_str>
eh_consumergroup = <your_consumer_group>
ehConf = {}
ehConf['eventhubs.connectionString'] = 
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_conn
str)
ehConf['eventhubs.consumerGroup'] = eh_consumergroup

streaming_df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

El siguiente fragmento de código se utiliza para procesar el dataframe de streaming. Primero descomprime el mensaje de Event Hubs si es necesario y, a continuación, analiza su estructura JSON en un formato tabular. Este código es un ejemplo y debe adaptarse a su escenario específico:

# Code tested in Databricks with Delta Lake 1.0

# defines an UDF to unzip the Event Hubs Body field, assuming it 
is gzipped

import zlib
def DecompressFunction(data):
  decoded_data = zlib.decompress(bytes(data), 15+32)
  return decoded_data.decode()

Decompress = udf(lambda body: DecompressFunction(body), 
StringType())
decoded_body_df = streaming_df.withColumn("DecodedBody", 
Decompress(col("body"))).select("DecodedBody")

# Parse json message from Event Hubs body, assuming the raw 
document is stored in the data field, and the others fields hold 
some metadata about it

schema = StructType([ \
    StructField("transactionId", LongType(),True), \
    StructField("timestamp",TimestampType(),True), \
    StructField("providerName", StringType(),True), \
    StructField("document", StringType(),True), \
    StructField("documentType", StringType(),True)
  ])

parsed_body_df = decoded_body_df.withColumn("jsonBody", 
from_json(col("DecodedBody"), schema)).select("jsonBody")

El procesamiento de datos real consta de dos pasos. El primero es extraer metadatos para ayudar a buscar los documentos sin procesar después del procesamiento. Los metadatos reales dependen del caso de uso, pero los ejemplos generalizables serían fechas e identificadores pertinentes, tipos de documento, servicio de origen y cualquier tipo de categoría:

# Code tested in Databricks with Delta Lake 1.0

df = parsed_body_df \
    .withColumn("transactionId", 
parsed_body_df.jsonBody.transactionId) \
    .withColumn("timestamp", parsed_body_df.jsonBody.timestamp) \
    .withColumn("providerName", 
parsed_body_df.jsonBody.providerName) \
    .withColumn("data", parsed_body_df.jsonBody.data)
    .withColumn("documentType", 
parsed_body_df.jsonBody.documentType)

El segundo paso del procesamiento consiste en generar una ruta de acceso a Azure Data Lake Storage Gen2, donde almacenará los documentos sin procesar:

# Code tested in Databricks with Delta Lake 1.0

# A function to generate a path
def GetPathFunction(timeStamp, transactionId, providerName, 
Suffix='', Extension=".gz"):
  yy = timeStamp.year
  mm = timeStamp.month
  dd = timeStamp.day
  hh = timeStamp.hour
  mn = timeStamp.minute
  Suffix = f"{Suffix}_" if Suffix != '' else ''
  Name = f"{Suffix}{providerName}{Extension}"
  path = f"/{yy}/{mm}/{dd}/{hh}/{mn}/{transactionId}/{Name}"
  return path

GetPath = udf(lambda timestamp, transactionId, providerName, 
suffix, extension: GetPathFunction(timestamp, transactionId, 
providerName, suffix, extension), StringType())

df = df.withColumn("path", GetPath(col("timestamp"), 
col("transactionId"), col("providerName"), col('documentType')))

Ingesta de metadatos en un lago delta

Los metadatos se escriben en una tabla delta que admite funcionalidades de consulta en tiempo real. Las escrituras se transmiten en un búfer y las consultas a la tabla pueden combinar los resultados del búfer con los de la parte histórica de la tabla.

El siguiente fragmento de código muestra cómo definir una tabla delta en el metastore y particionarla por fecha:

# Code tested in Databricks with Delta Lake 1.0

DeltaTable.create(spark) \
   .tableName("metadata") \
   .addColumn("transactionId", LongType()) \
   .addColumn("date", TimestampType()) \
   .addColumn("providerName", StringType()) \
   .addColumn("documentType", StringType()) \
   .addColumn("path", StringType()) \
   .partitionedBy("date") \
   .execute()

Tenga en cuenta que el campo de identidad de la transacción es numérico. En su lugar, los mensajes típicos que pasan sistemas distribuidos pueden utilizar GUID para identificar de forma única las transacciones. Sin embargo, los tipos de datos numéricos permiten un mayor rendimiento de las consultas en la mayoría de las plataformas de datos.

La asignación de un identificador de transacción único puede ser un desafío, dada la naturaleza distribuida de las plataformas de datos en la nube (como Spark). Un enfoque útil es basar este identificador de transacción en un identificador de partición (como, por ejemplo, el número de partición Event Hubs) y un número incremental dentro de la partición. Un ejemplo de este enfoque es monotonically_increasing_id() en Azure Databricks.

El siguiente fragmento de código muestra cómo anexar la secuencia con metadatos de documentos sin procesar a la tabla delta:

# Code tested in Databricks with Delta Lake 1.0

df.withColumn("date", col("timeStamp").cast(DateType())) \
    .select("transactionId", "date", "providerName", 
"documentType", "path") \
    .writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", 
"/delta/metadata/_checkpoints/metadata_checkpoint") \
    .table("metadata")

Tenga en cuenta que la creación de particiones se administra al escribir la secuencia según el esquema de tabla.

Ingesta de datos en un lago de datos

Los documentos sin procesar reales se escriben en un nivel de rendimiento de almacenamiento adecuado en Azure Data Lake Gen2.

El siguiente fragmento de código muestra una función sencilla para cargar un archivo en Azure Data Lake Store Gen2; la utilización de un método foreach en la clase DataStreamWriterpermite cargar el archivo hospedado en cada registro del dataframe de streaming:

# Code tested in Databricks with Delta Lake 1.0

from azure.storage.filedatalake import DataLakeServiceClient

def upload_data(storage_account_name, storage_account_key, 
file_system_name, file_path, data):

  service_client = 
DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".
format("https", storage_account_name), 
credential=storage_account_key)

  file_system_client = 
service_client.get_file_system_client(file_system_name)
  file_client = 
service_client.get_file_client(file_system_client.file_system_nam
e, file_path)
    
  if not file_client.exists:
    file_client.create_file()      

  file_client.upload_data(data, overwrite=True)
  
# Process a row to upload data to ADLS
def Row2ADLS(row):
  upload_data(adls_name, adls_key, adls_container, row['path'], 
row['data'])

df.writeStream.foreach(Row2ADLS).start()

Cliente

El cliente puede ser una aplicación web personalizada que usa metadatos para recuperar rutas de acceso de documentos de la tabla delta con instrucciones SQL estándar y, a su vez, el documento real del lago de datos con las API estándar de Azure Data Lake Storage Gen2.

El siguiente fragmento de código, por ejemplo, muestra cómo recuperar las rutas de acceso de todos los documentos de una determinada transacción:

select * from metadata where transactionId = '123456'

Pasos siguientes

Consulte la guía de arquitectura relacionada:

Consulte estas arquitecturas relacionadas: