Compartir a través de


Procesamiento de flujos de datos con Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Esta arquitectura de referencia muestra una canalización de procesamiento de flujos de datos de un extremo a otro. Las cuatro fases de esta canalización incluyen la ingesta, el proceso, el almacenamiento y el análisis y el informe. Para esta arquitectura de referencia, la canalización ingiere datos de dos orígenes, realiza una combinación de los registros relacionados de cada flujo, enriquece el resultado y calcula un promedio en tiempo real. A continuación, los resultados se almacenan para su posterior análisis.

Arquitectura

Diagrama que muestra una arquitectura de referencia para el procesamiento de flujos con Azure Databricks.

Descargar un archivo de Visio de esta arquitectura.

Flujo de datos

El siguiente flujo de datos corresponde al diagrama anterior:

  1. Ingest

    Dos flujos de datos operativos en tiempo real alimentan el sistema: datos de tarifas y datos de carreras. Los dispositivos instalados en taxis actúan como orígenes de datos y publican eventos en Azure Event Hubs. Cada secuencia va a su propia instancia de hub de eventos, que proporciona rutas de ingesta independientes.

  2. Proceso

    Azure Databricks consume flujos de Event Hubs y ejecuta las siguientes operaciones:

    • Correlaciona los registros de tarifas con los registros de viajes
    • Enriquece los datos mediante un tercer conjunto de datos que contiene datos de búsqueda de vecindario almacenados en el sistema de archivos de Azure Databricks.

    Este proceso genera un conjunto de datos unificado enriquecido adecuado para el análisis y el almacenamiento de nivel inferior.

  3. Almacén

    La salida de los trabajos de Azure Databricks es una serie de registros. Los registros procesados se escriben en Azure Cosmos DB para NoSQL.

  4. Análisis o informe

    Fabric refleja los datos operativos de Azure Cosmos DB para NoSQL para habilitar consultas analíticas sin afectar al rendimiento transaccional. Este enfoque proporciona una ruta de acceso sin ETL para el análisis. En esta arquitectura, puede usar la creación de reflejo para los siguientes fines:

    • Espejar datos de Azure Cosmos DB (o datos en formato Delta) en Fabric
    • Mantener los conjuntos de datos sincronizados con el sistema operativo
    • Habilite el análisis a través de las siguientes herramientas:
      • Puntos de conexión de análisis SQL de Fabric para almacenes de datos y lakehouses.
      • Cuadernos de Apache Spark
      • Análisis en tiempo real mediante el lenguaje de consulta Kusto (KQL) para la exploración de series temporales y estilo de registro
  5. Monitor

    Azure Monitor recopila telemetría de la canalización de procesamiento de Azure Databricks. Un área de trabajo de Log Analytics almacena métricas y registros de aplicaciones. Puede realizar las siguientes acciones:

    • Consulta de registros operativos
    • Visualización de métricas
    • Inspección de errores, anomalías y problemas de rendimiento
    • Crear cuadros de mando

Components

  • Azure Databricks es una plataforma de análisis basada en Spark optimizada para la plataforma Azure. En esta arquitectura, los trabajos de Azure Databricks enriquecen los datos de carreras y tarifas de taxi y almacenan los resultados en Azure Cosmos DB.

  • Event Hubs es un servicio de ingesta distribuido administrado que se puede escalar para ingerir grandes cantidades de eventos. Esta arquitectura usa dos instancias del centro de eventos para recibir datos de taxis.

  • Azure Cosmos DB para NoSQL es un servicio de base de datos administrado y de varios modelos. En esta arquitectura, almacena la salida de los trabajos de enriquecimiento de Azure Databricks. Fabric refleja los datos operativos de Azure Cosmos DB para habilitar consultas analíticas.

  • Log Analytics es una herramienta de Azure Monitor que le ayuda a consultar y analizar datos de registro de varios orígenes. En esta arquitectura, todos los recursos configuran Azure Diagnostics para almacenar registros de plataforma en esta área de trabajo. El área de trabajo también actúa como receptor de datos para las métricas de trabajo de Spark emitidas desde las canalizaciones de procesamiento de Azure Databricks.

Detalles del escenario

Una empresa de taxis recopila datos sobre cada viaje de taxi. En este escenario, se supone que dos dispositivos independientes envían datos. El taxi tiene un medidor que envía información sobre cada viaje, incluida la duración, la distancia y las ubicaciones de recogida y entrega. Un dispositivo independiente acepta los pagos de clientes y envía los datos sobre las tarifas. Para detectar las tendencias de los corredores, la compañía de taxis quiere calcular el promedio de propinas por milla controladas por cada vecindario, en tiempo real.

Ingesta de datos

Para simular un origen de datos, esta arquitectura de referencia usa el conjunto de datos de datos de taxis de la ciudad de Nueva York. Este conjunto de datos contiene datos sobre los viajes de taxi en la ciudad de Nueva York de 2010 a 2013. Contiene registros de datos de carreras y tarifas. Los datos de carreras incluyen la duración del viaje, la distancia de viaje y las ubicaciones de recogida y entrega. Los datos de tarifas incluyen las tarifas, los impuestos y las propinas. Los campos de ambos tipos de registro incluyen el número de medallion, la licencia hack y el identificador de proveedor. La combinación de estos tres campos identifica de forma única un taxi y un conductor. Los datos se almacenan en formato CSV.

El generador de datos es una aplicación de .NET Core que lee los registros y los envía a Event Hubs. El generador envía los datos de carreras en formato JSON y los datos de tarifas en formato CSV.

Event Hubs usa particiones para segmentar los datos. Las particiones permiten que un consumidor lea cada dato de lectura en paralelo. Al enviar datos a Event Hubs, puede especificar la clave de partición directamente. En caso contrario, los registros se asignan a las particiones en modo round-robin.

En este escenario, los datos de carreras y los datos de tarifas deben asignarse el mismo identificador de partición para un taxi específico. Esta asignación permite a Databricks aplicar un grado de paralelismo cuando correlaciona las dos secuencias. Por ejemplo, un registro de la partición n de los datos de carreras coincide con un registro en la partición n de los datos de tarifas.

Diagrama de procesamiento de flujos de datos con Azure Databricks y Event Hubs.

Descargue un archivo de Visio de esta arquitectura.

En el generador de datos, el modelo de datos común para ambos tipos de registro tiene una propiedad PartitionKey que es la concatenación de Medallion, HackLicense y VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propiedad proporciona una clave de partición explícita cuando envía datos a Event Hubs.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

La capacidad de procesamiento de Event Hubs se mide en unidades de procesamiento. Para escalar automáticamente un centro de eventos, habilite la inflación automática. Esta característica escala automáticamente las unidades de rendimiento en función del tráfico, hasta un máximo configurado.

Procesamiento de flujos

En Azure Databricks, un trabajo realiza el procesamiento de datos. El trabajo se asigna a un clúster y, a continuación, se ejecuta en él. El trabajo puede ser código personalizado escrito en Java o en un cuaderno de Spark.

En esta arquitectura de referencia, el trabajo es un archivo de Java que tiene clases escritas en Java y Scala. Al especificar el archivo de Java para un trabajo de Azure Databricks, el clúster de Azure Databricks especifica la clase para la operación. Aquí, el método main de la clase com.microsoft.pnp.TaxiCabReader contiene la lógica de procesamiento de datos.

Lee la secuencia de las dos instancias del centro de eventos.

La lógica de procesamiento de datos usa streaming estructurado de Spark para leer de las dos instancias de Azure Event Hub:

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Enriquecer los datos con la información del vecindario

Los datos de carreras incluyen las coordenadas de latitud y longitud de las ubicaciones de recogida y entrega. Estas coordenadas son útiles, pero no se consumen fácilmente para el análisis. Por lo tanto, el pipeline enriquece estos datos con los datos del vecindario leídos desde un shapefile.

El formato shapefile es binario y no se analiza fácilmente. Pero la biblioteca GeoTools proporciona herramientas para datos geoespaciales que usan el formato shapefile. Esta biblioteca se usa en la com.microsoft.pnp.GeoFinder clase para determinar el nombre del vecindario en función de las coordenadas de las ubicaciones de recogida y entrega.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Unirse a los datos de carreras y tarifas

En primer lugar, se transforman los datos de carreras y tarifas:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

A continuación, los datos del viaje se unen con los datos de tarifas:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Procesamiento de los datos e inserción en Azure Cosmos DB

El importe medio de tarifas para cada vecindario se calcula para un intervalo de tiempo específico:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

A continuación, el importe medio de la tarifa se inserta en Azure Cosmos DB:

maxAvgFarePerNeighborhood
  .writeStream
  .format("cosmos.oltp")
  .option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
  .option("spark.cosmos.accountKey", "<your-cosmos-key>")
  .option("spark.cosmos.database", "<your-database-name>")
  .option("spark.cosmos.container", "<your-container-name>")
  .option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
  .outputMode("append")
  .start()
  .awaitTermination()

Consideraciones

Estas consideraciones implementan los pilares del Azure Well-Architected Framework, que es un conjunto de principios rectores que puede utilizar para mejorar la calidad de una carga de trabajo. Para obtener más información, consulte Marco de buena arquitectura.

Seguridad

La seguridad proporciona garantías contra ataques deliberados y el uso indebido de sus valiosos datos y sistemas. Para obtener más información, vea Lista de comprobación de revisión de diseño para security.

El acceso al área de trabajo de Azure Databricks se controla mediante la consola de administrador. La consola de administrador incluye funcionalidad para agregar usuarios, administrar permisos de usuario y configurar el inicio de sesión único. El control del acceso para las áreas de trabajo, los clústeres, los trabajos y las tablas también se puede establecer en la consola de administrador.

Administración de secretos

Azure Databricks incluye un almacén de secretos que se usa para almacenar credenciales y hacer referencia a ellas en cuadernos y trabajos. Ámbito de los secretos de partición dentro del almacén de secretos de Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Los secretos se agregan en el nivel de ámbito:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota:

Use un ámbito respaldado por Azure Key Vault en lugar del ámbito nativo de Azure Databricks.

El código accede a los secretos a través de las utilidades de secretos de Azure Databricks.

Optimización de costos

La optimización de costos se centra en formas de reducir los gastos innecesarios y mejorar las eficiencias operativas. Para obtener más información, consulte Lista de comprobación de revisión de diseño para la optimización de costos.

Puede usar la calculadora de precios de Azure para calcular los costos. Tenga en cuenta los siguientes servicios usados en esta arquitectura de referencia.

Consideraciones sobre los costos de Event Hubs

Esta arquitectura de referencia implementa Event Hubs en el nivel Estándar. El modelo de precios se basa en las unidades de procesamiento, los eventos de entrada y los eventos de captura. Un evento de entrada es una unidad de datos de 64 KB o menos. Los mensajes más grandes se facturan en múltiplos de 64 KB. Las unidades de procesamiento se especifican a través de Azure Portal o de las API de administración de Events Hubs.

Si necesita más días de retención, tenga en cuenta el nivel Dedicado. Este nivel proporciona implementaciones de un solo inquilino que tienen requisitos estrictos. Esta oferta crea un clúster basado en unidades de capacidad y no depende de las unidades de rendimiento. El nivel Estándar también se factura en función de los eventos de entrada y las unidades de rendimiento.

Para más información, consulte Precios de Event Hubs.

Consideraciones sobre los costos de Azure Databricks

Azure Databricks proporciona el nivel Estándar y el nivel Premium, que admiten tres cargas de trabajo. Esta arquitectura de referencia implementa un área de trabajo de Azure Databricks en el nivel Premium.

Las cargas de trabajo de ingeniería de datos deben ejecutarse en un clúster de trabajos. Los ingenieros de datos usan clústeres para compilar y realizar trabajos. Las cargas de trabajo de análisis de datos deben ejecutarse en un clúster de uso completo y están diseñadas para que los científicos de datos exploren, visualicen, manipulen y compartan datos e información de forma interactiva.

Azure Databricks proporciona varios modelos de precios.

  • Plan de pago por uso

    Se le facturan las máquinas virtuales (VM) aprovisionadas en clústeres y unidades de Azure Databricks (DTU) en función de la instancia de máquina virtual elegida. Un DBU es una unidad de capacidad de procesamiento que Azure factura por uso por segundo. El consumo de DBU depende del tamaño y el tipo de instancia que se ejecuta en Azure Databricks. Los precios dependen de la carga de trabajo y el nivel elegidos.

  • Plan de compra previa

    Se compromete a las DTU como unidades de confirmación de Azure Databricks durante uno o tres años para reducir el costo total de propiedad durante ese período de tiempo en comparación con el modelo de pago por uso.

Para más información, consulte Precios de Azure Databricks.

Consideraciones sobre los costos de Azure Cosmos DB

En esta arquitectura, el trabajo de Azure Databricks escribe una serie de registros en Azure Cosmos DB. Se le cobra por la capacidad que reserva, que se mide en Unidades de solicitud por segundo (RU/s). Esta capacidad se usa para realizar operaciones de inserción. La unidad de facturación es de 100 RU/s por hora. Por ejemplo, el costo de escribir elementos de 100 KB es de 50 RU/s.

Para las operaciones de escritura, configure la capacidad suficiente para admitir el número de escrituras necesarias por segundo. Puede aumentar el rendimiento aprovisionado mediante el portal o la CLI de Azure antes de realizar operaciones de escritura y, a continuación, reducir el rendimiento una vez completadas esas operaciones. El rendimiento del período de escritura es la suma del rendimiento mínimo necesario para los datos específicos y el rendimiento necesario para la operación de inserción. Este cálculo supone que no hay ninguna otra carga de trabajo en ejecución.

Ejemplo de análisis de costos

Supongamos que configura un valor de rendimiento de 1000 RU/s en un contenedor y lo ejecuta continuamente durante 30 días, lo que equivale a 720 horas.

El contenedor se factura a 10 unidades de 100 RU/s por hora por cada hora. Diez unidades a $0,008 (por 100 RU/s por hora) se cobran a $0,08 por hora.

Durante 720 horas o 7.200 unidades (de 100 RU), se le facturan 57,60 USD por mes.

El almacenamiento también se factura por cada GB que se usa para los datos almacenados y el índice. Para más información, consulte el modelo de precios de Azure Cosmos DB.

Use la calculadora de capacidad de Azure Cosmos DB para obtener una estimación rápida del costo de la carga de trabajo.

Excelencia operativa

La excelencia operativa abarca los procesos de operaciones que implementan una aplicación y lo mantienen en ejecución en producción. Para obtener más información, vea Lista de comprobación de revisión de diseño para la excelencia operativa.

Supervisión

Azure Databricks se basa en Apache Spark. Tanto Azure Databricks como Apache Spark usan Apache Log4j como biblioteca estándar para el registro. Además del registro predeterminado que proporciona Apache Spark, puede implementar el registro en Log Analytics. Para obtener más información, consulte Supervisión de Azure Databricks.

A medida que la com.microsoft.pnp.TaxiCabReader clase procesa los mensajes de carreras y tarifas, es posible que un mensaje tenga un formato incorrecto y, por tanto, no sea válido. En un entorno de producción, es importante analizar estos mensajes con formato incorrecto para identificar un problema con los orígenes de datos para que se pueda corregir rápidamente para evitar la pérdida de datos. La com.microsoft.pnp.TaxiCabReader clase registra un acumulador de Apache Spark que realiza un seguimiento del número de registros de tarifas con formato incorrecto y registros de carreras:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark usa la biblioteca Dropwizard para enviar métricas. Algunos de los campos nativos de métricas dropwizard no son compatibles con Log Analytics, por lo que esta arquitectura de referencia incluye un receptor y un reportera personalizados de Dropwizard. Da formato a las métricas en el formato que Log Analytics espera. Cuando Apache Spark informa de las métricas, también se envían las métricas personalizadas de los datos de carreras y tarifas con formato incorrecto

Puede usar las siguientes consultas de ejemplo en el área de trabajo de Log Analytics para supervisar el funcionamiento del trabajo de streaming. El argumento ago(1d) de cada consulta devuelve todos los registros que se generaron en el último día. Puede ajustar este parámetro para ver un período de tiempo diferente.

Excepciones registradas durante la operación de consulta de flujo

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Acumulación de datos de carrera y tarifas con formato incorrecto

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Operación de trabajo a lo largo del tiempo

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Organización e implementaciones de recursos

  • Cree grupos de recursos independientes para entornos de producción, desarrollo y pruebas. Los grupos de recursos independientes facilitan la administración de implementaciones, la eliminación de implementaciones de prueba y la asignación de derechos de acceso.

  • Use la plantilla de Azure Resource Manager para implementar los recursos de Azure según el proceso de infraestructura como código. Mediante el uso de plantillas, puede automatizar implementaciones con servicios de Azure DevOps u otras soluciones de integración continua y entrega continua (CI/CD).

  • Coloque cada carga de trabajo en una plantilla de implementación independiente y almacene los recursos en los sistemas de control de código fuente. Puede implementar las plantillas en conjunto o por separado como parte de un proceso de CI/CD. Este enfoque simplifica el proceso de automatización.

    En esta arquitectura, Event Hubs, Log Analytics y Azure Cosmos DB se identifican como una sola carga de trabajo. Estos recursos se incluyen en una sola plantilla de Azure Resource Manager.

  • Considere la posibilidad de almacenar provisionalmente las cargas de trabajo. Implemente en varias fases y ejecute comprobaciones de validación en cada fase antes de pasar a la siguiente fase. De este modo, puede controlar cómo insertar actualizaciones en los entornos de producción y minimizar los problemas de implementación imprevistos.

    En esta arquitectura, hay varias fases de implementación. Considere la posibilidad de crear una canalización de Azure DevOps y agregar esas fases. Puede automatizar las siguientes fases:

    • Inicie un clúster de Azure Databricks.
    • Configure la CLI de Azure Databricks.
    • Instale las herramientas de Scala.
    • Agregue los secretos de Azure Databricks.

    Considere la posibilidad de escribir pruebas de integración automatizadas para mejorar la calidad y confiabilidad del código de Azure Databricks y su ciclo de vida.

Paso siguiente