Procesamiento de flujos de datos con Azure Databricks
Esta arquitectura de referencia muestra una canalización de procesamiento de flujos de datos de un extremo a otro. Las cuatro fases de esta canalización son la ingesta, el proceso, el almacenamiento y el análisis y el informe. Para esta arquitectura de referencia, la canalización ingiere datos de dos orígenes, realiza una combinación de los registros relacionados de cada flujo, enriquece el resultado y calcula un promedio en tiempo real. A continuación, los resultados se almacenan para su posterior análisis.
Hay disponible una implementación de referencia de esta arquitectura en GitHub.
Arquitectura
Descargar un archivo de Visio de esta arquitectura.
Flujo de trabajo
El siguiente flujo de datos corresponde al diagrama anterior:
En esta arquitectura, hay dos orígenes de datos que generan flujos de datos en tiempo real. La primera secuencia contiene información de carreras y la segunda secuencia contiene información de tarifas. La arquitectura de referencia incluye un generador de datos simulado que lee de un conjunto de archivos estáticos e inserta los datos en Azure Event Hubs. Los orígenes de datos de una aplicación real son dispositivos instalados en los taxis.
Event Hubs es un servicio de ingesta de eventos. Esta arquitectura emplea dos instancias de centro de eventos, uno para cada origen de datos. Cada origen de datos envía un flujo de datos al centro de eventos asociado.
Azure Databricks es una plataforma de análisis basada en Apache Spark optimizada para la plataforma de servicios en la nube de Microsoft Azure. Azure Databricks se usa para correlacionar los datos de carreras y tarifas de taxi y para enriquecer los datos correlacionados con los datos de vecindario almacenados en el sistema de archivos de Azure Databricks.
Azure Cosmos DB es un servicio de base de datos de varios modelos totalmente administrado. La salida de un trabajo de Azure Databricks es una serie de registros, que se escriben en Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra se usa porque admite el modelado de datos de serie temporal.
Azure Synapse Link para Azure Cosmos DB permite ejecutar análisis casi en tiempo real en datos operativos en Azure Cosmos DB, sin ningún efecto de rendimiento o costo en la carga de trabajo transaccional. Puede lograr estos resultados mediante grupos de SQL sin servidor y grupos de Spark. Estos motores de análisis están disponibles en el área de trabajo de Azure Synapse Analytics.
La creación de reflejo de Azure Cosmos DB para NoSQL en Microsoft Fabric le permite integrar datos de Azure Cosmos DB con el resto de los datos en Microsoft Fabric.
Log Analytics es una herramienta de Azure Monitor que permite consultar y analizar datos de registro de varios orígenes. Los datos de registro de aplicaciones que recopila Azure Monitor se almacenan en un área de trabajo de Log Analytics. Puede usar consultas de Log Analytics para analizar y visualizar métricas e inspeccionar los mensajes de registro para identificar problemas dentro de la aplicación.
Detalles del escenario
Una empresa de taxis recopila datos sobre cada viaje de taxi. En este escenario, se supone que dos dispositivos independientes envían datos. El taxi tiene un medidor que envía información sobre cada viaje, incluida la duración, la distancia y las ubicaciones de recogida y entrega. Un dispositivo independiente acepta los pagos de clientes y envía los datos sobre las tarifas. Para detectar las tendencias de los corredores, la compañía de taxis quiere calcular el promedio de propinas por milla controladas por cada vecindario, en tiempo real.
Ingesta de datos
Para simular un origen de datos, esta arquitectura de referencia usa el conjunto de datos de datos de taxis de la ciudad de Nueva York1. Este conjunto de datos contiene datos sobre los viajes de taxi en la ciudad de Nueva York de 2010 a 2013. Contiene registros de datos de carreras y tarifas. Los datos de carreras incluyen la duración del viaje, la distancia de viaje y las ubicaciones de recogida y entrega. Los datos de tarifas incluyen las tarifas, los impuestos y las propinas. Los campos de ambos tipos de registro incluyen el número de medallion, la licencia hack y el identificador de proveedor. La combinación de estos tres campos identifica de forma única un taxi y un conductor. Los datos se almacenan en formato CSV.
[1] Donovan, Brian; Trabajo, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidad de Illinois en Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
El generador de datos es una aplicación de .NET Core que lee los registros y los envía a Event Hubs. El generador envía los datos de carreras en formato JSON y los datos de tarifas en formato CSV.
Event Hubs usa particiones para segmentar los datos. Las particiones permiten a los consumidores leer cada partición en paralelo. Al enviar datos a Event Hubs, puede especificar la clave de partición directamente. En caso contrario, los registros se asignan a las particiones en modo round-robin.
En este escenario, los datos de carreras y los datos de tarifas deben asignarse el mismo identificador de partición para un taxi específico. Esta asignación permite a Databricks aplicar un grado de paralelismo cuando correlaciona las dos secuencias. Por ejemplo, un registro de la partición n de los datos de carreras coincide con un registro en la partición n de los datos de tarifas.
Descargue un archivo de Visio de esta arquitectura.
En el generador de datos, el modelo de datos común para ambos tipos de registro tiene una propiedad PartitionKey
que es la concatenación de Medallion
, HackLicense
y VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Esta propiedad se usa para proporcionar una clave de partición explícita cuando envía datos a Event Hubs.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Event Hubs
La capacidad de procesamiento de Event Hubs se mide en unidades de procesamiento. Para escalar automáticamente un centro de eventos, habilite el inflado automático. Esta característica escala automáticamente las unidades de rendimiento en función del tráfico, hasta un máximo configurado.
Procesamiento de flujos
En Azure Databricks, un trabajo realiza el procesamiento de datos. El trabajo se asigna a un clúster y, a continuación, se ejecuta en él. El trabajo puede ser código personalizado escrito en Java o en un cuaderno de Spark.
En esta arquitectura de referencia, el trabajo es un archivo de Java que tiene clases escritas en Java y Scala. Al especificar el archivo de Java para un trabajo de Databricks, el clúster de Databricks especifica la clase para la operación. Aquí, el método main
de la clase com.microsoft.pnp.TaxiCabReader
contiene la lógica de procesamiento de datos.
Lee la secuencia de las dos instancias del centro de eventos.
La lógica de procesamiento de datos usa streaming estructurado de Spark para leer de las dos instancias de Azure Event Hub:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Enriquecer los datos con la información del vecindario
Los datos de carreras incluyen las coordenadas de latitud y longitud de las ubicaciones de recogida y entrega. Estas coordenadas son útiles, pero no se consumen fácilmente para el análisis. Por lo tanto, estos datos se enriquecen con los datos de vecindario que se leen de un archivo shape.
El formato shapefile es binario y no se analiza fácilmente. Pero la biblioteca GeoTools proporciona herramientas para datos geoespaciales que usan el formato shapefile. Esta biblioteca se usa en la com.microsoft.pnp.GeoFinder
clase para determinar el nombre del vecindario en función de las coordenadas de las ubicaciones de recogida y entrega.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Unirse a los datos de carreras y tarifas
En primer lugar, se transforman los datos de carreras y tarifas:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
A continuación, los datos del viaje se unen con los datos de tarifas:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Procesamiento de los datos e inserción en Azure Cosmos DB
El importe medio de tarifas para cada vecindario se calcula para un intervalo de tiempo específico:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
A continuación, el importe medio de la tarifa se inserta en Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Consideraciones
Estas consideraciones implementan los pilares del Azure Well-Architected Framework, que es un conjunto de principios rectores que puede utilizar para mejorar la calidad de una carga de trabajo. Para más información, consulte Marco de buena arquitectura de Microsoft Azure.
Seguridad
La seguridad proporciona garantías contra ataques deliberados y el uso indebido de sus valiosos datos y sistemas. Para obtener más información, vea Lista de comprobación de revisión de diseño para security.
El acceso al área de trabajo de Azure Databricks se controla mediante la consola de administrador. La consola de administrador incluye funcionalidad para agregar usuarios, administrar permisos de usuario y configurar el inicio de sesión único. El control del acceso para las áreas de trabajo, los clústeres, los trabajos y las tablas también se puede establecer en la consola de administrador.
Administración de secretos
Azure Databricks incluye un almacén de secretos que se usa para almacenar credenciales y hacer referencia a ellas en cuadernos y trabajos. Ámbito de los secretos de partición dentro del almacén de secretos de Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Los secretos se agregan en el nivel de ámbito:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Nota:
Use un ámbito respaldado por Azure Key Vault en lugar del ámbito nativo de Azure Databricks.
En el código, se obtiene acceso a los secretos mediante las utilidades de secretos de Azure Databricks.
Optimización de costos
La optimización de costos se centra en formas de reducir los gastos innecesarios y mejorar las eficiencias operativas. Para obtener más información, consulte Lista de comprobación de revisión de diseño para la optimización de costos.
Puede usar la calculadora de precios de Azure para calcular los costos. Tenga en cuenta los siguientes servicios usados en esta arquitectura de referencia.
Consideraciones sobre los costos de Event Hubs
Esta arquitectura de referencia implementa Event Hubs en el nivel Estándar. El modelo de precios se basa en las unidades de procesamiento, los eventos de entrada y los eventos de captura. Un evento de entrada es una unidad de datos de 64 KB o menos. Los mensajes más grandes se facturan en múltiplos de 64 KB. Las unidades de procesamiento se especifican a través de Azure Portal o de las API de administración de Events Hubs.
Si necesita más días de retención, tenga en cuenta el nivel Dedicado. Este nivel proporciona implementaciones de un solo inquilino que tienen requisitos estrictos. Esta oferta crea un clúster basado en unidades de capacidad y no depende de las unidades de rendimiento. El nivel Estándar también se factura en función de los eventos de entrada y las unidades de rendimiento.
Para más información, consulte Precios de Event Hubs.
Consideraciones sobre los costos de Azure Databricks
Azure Databricks proporciona el nivel Estándar y el nivel Premium, que admiten tres cargas de trabajo. Esta arquitectura de referencia implementa un área de trabajo de Azure Databricks en el nivel Premium.
Las cargas de trabajo de ingeniería de datos deben ejecutarse en un clúster de trabajos. Los ingenieros de datos usan clústeres para compilar y realizar trabajos. Las cargas de trabajo de análisis de datos deben ejecutarse en un clúster de uso completo y están diseñadas para que los científicos de datos exploren, visualicen, manipulen y compartan datos e información de forma interactiva.
Azure Databricks proporciona varios modelos de precios.
Plan de pago por uso
Se le facturan las máquinas virtuales (VM) aprovisionadas en clústeres y unidades de Azure Databricks (DTU) en función de la instancia de máquina virtual elegida. Un DBU es una unidad de capacidad de procesamiento que se factura por uso por segundo. El consumo de DBU depende del tamaño y el tipo de instancia que se ejecuta en Azure Databricks. Los precios dependen de la carga de trabajo y el nivel elegidos.
Plan de compra previa
Se compromete a las DTU como unidades de confirmación de Azure Databricks durante uno o tres años para reducir el costo total de propiedad durante ese período de tiempo en comparación con el modelo de pago por uso.
Para más información, consulte Precios de Azure Databricks.
Consideraciones sobre los costos de Azure Cosmos DB
En esta arquitectura, el trabajo de Azure Databricks escribe una serie de registros en Azure Cosmos DB. Se le cobra por la capacidad que reserva, que se mide en Unidades de solicitud por segundo (RU/s). Esta capacidad se usa para realizar operaciones de inserción. La unidad de facturación es de 100 RU/s por hora. Por ejemplo, el costo de escribir elementos de 100 KB es de 50 RU/s.
En cuanto a operaciones de escritura, aprovisione capacidad suficiente para admitir el número de escrituras necesarias por segundo. Puede aumentar el rendimiento aprovisionado mediante el portal o la CLI de Azure antes de realizar operaciones de escritura y, a continuación, reducir el rendimiento una vez completadas esas operaciones. El rendimiento del período de escritura es la suma del rendimiento mínimo necesario para los datos específicos y el rendimiento necesario para la operación de inserción. Este cálculo supone que no hay ninguna otra carga de trabajo en ejecución.
Ejemplo de análisis de costos
Supongamos que configura un valor de rendimiento de 1000 RU/s en un contenedor. Se implementa durante 24 horas durante 30 días, durante un total de 720 horas.
El contenedor se factura a 10 unidades de 100 RU/s por hora por cada hora. Diez unidades a $0,008 (por 100 RU/s por hora) se cobran a $0,08 por hora.
Durante 720 horas o 7.200 unidades (de 100 RU), se le facturan 57,60 USD por mes.
El almacenamiento también se factura por cada GB que se usa para los datos almacenados y el índice. Para más información, consulte el modelo de precios de Azure Cosmos DB.
Use la calculadora de capacidad de Azure Cosmos DB para obtener una estimación rápida del costo de la carga de trabajo.
Excelencia operativa
La excelencia operativa abarca los procesos de operaciones que implementan una aplicación y lo mantienen en ejecución en producción. Para obtener más información, vea Lista de comprobación de revisión de diseño para la excelencia operativa.
Supervisión
Azure Databricks se basa en Apache Spark. Tanto Azure Databricks como Apache Spark usan Apache Log4j como biblioteca estándar para el registro. Además del registro predeterminado que proporciona Apache Spark, puede implementar el registro en Log Analytics. Para obtener más información, consulte Supervisión de Azure Databricks.
A medida que la com.microsoft.pnp.TaxiCabReader
clase procesa los mensajes de carreras y tarifas, es posible que un mensaje tenga un formato incorrecto y, por tanto, no sea válido. En un entorno de producción, es importante analizar estos mensajes con formato incorrecto para identificar un problema con los orígenes de datos para que se pueda corregir rápidamente para evitar la pérdida de datos. La com.microsoft.pnp.TaxiCabReader
clase registra un acumulador de Apache Spark que realiza un seguimiento del número de registros de tarifas con formato incorrecto y registros de carreras:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark usa la biblioteca Dropwizard para enviar métricas. Algunos de los campos nativos de métricas dropwizard no son compatibles con Log Analytics, por lo que esta arquitectura de referencia incluye un receptor y un reportera personalizados de Dropwizard. Da formato a las métricas en el formato que Log Analytics espera. Cuando Apache Spark informa de las métricas, también se envían las métricas personalizadas de los datos de carreras y tarifas con formato incorrecto
Puede usar las siguientes consultas de ejemplo en el área de trabajo de Log Analytics para supervisar el funcionamiento del trabajo de streaming. El argumento ago(1d)
de cada consulta devuelve todos los registros que se generaron en el último día. Puede ajustar este parámetro para ver un período de tiempo diferente.
Excepciones registradas durante la operación de consulta de flujo
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Acumulación de datos de carrera y tarifas con formato incorrecto
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operación de trabajo a lo largo del tiempo
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organización e implementaciones de recursos
Cree grupos de recursos independientes para entornos de producción, desarrollo y pruebas. Los grupos de recursos independientes facilitan la administración de implementaciones, la eliminación de implementaciones de prueba y la asignación de derechos de acceso.
Use la plantilla de Azure Resource Manager para implementar los recursos de Azure según el proceso de infraestructura como código. Mediante el uso de plantillas, puede automatizar fácilmente las implementaciones con los servicios de Azure DevOps u otras soluciones de integración continua y entrega continua (CI/CD).
Coloque cada carga de trabajo en una plantilla de implementación independiente y almacene los recursos en los sistemas de control de código fuente. Puede implementar las plantillas en conjunto o por separado como parte de un proceso de CI/CD. Este enfoque simplifica el proceso de automatización.
En esta arquitectura, Event Hubs, Log Analytics y Azure Cosmos DB se identifican como una sola carga de trabajo. Estos recursos se incluyen en una sola plantilla de Azure Resource Manager.
Considere la posibilidad de almacenar provisionalmente las cargas de trabajo. Implemente en varias fases y ejecute comprobaciones de validación en cada fase antes de pasar a la siguiente fase. De este modo, puede controlar cómo insertar actualizaciones en los entornos de producción y minimizar los problemas de implementación imprevistos.
En esta arquitectura, hay varias fases de implementación. Considere la posibilidad de crear una canalización de Azure DevOps y agregar esas fases. Puede automatizar las siguientes fases:
- Inicie un clúster de Databricks.
- Configuración de la CLI de Databricks.
- Instale las herramientas de Scala.
- Agregue los secretos de Databricks.
Considere la posibilidad de escribir pruebas de integración automatizadas para mejorar la calidad y confiabilidad del código de Databricks y su ciclo de vida.
Implementación de este escenario
Para implementar y ejecutar la implementación de referencia, siga los pasos descritos en el archivo Léame de GitHub.