你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure Databricks 进行流处理

Azure Cosmos DB
Azure Databricks
Azure 事件中心
Azure Log Analytics
Azure Monitor

本参考体系结构演示一个端到端流处理管道。 此类管道包括四个阶段:引入、处理、存储,以及分析和报告。 本参考体系结构中的管道从两个源引入数据,针对每个流中的相关记录执行联接,丰富结果,然后实时计算平均值。 将存储结果以供进一步分析。

GitHub logoGitHub 中提供了本体系结构的参考实现。

体系结构

Diagram showing a reference architecture for stream processing with Azure Databricks.

下载此体系结构的 Visio 文件

工作流

该体系结构包括以下组件:

数据源。 在此体系结构中,有两个数据源实时生成数据流。 第一个流包含行程信息,第二个流包含费用信息。 该参考体系结构包含一个模拟的数据生成器,该生成器读取一组静态文件,并将数据推送到事件中心。 实际应用中的数据源是安装在出租车内部的设备。

Azure 事件中心事件中心是一个事件引入服务。 此体系结构使用两个事件中心实例,每个数据源各对应一个。 每个数据源将数据流发送到关联的事件中心。

Azure DatabricksDatabricks 是基于 Apache Spark 的分析平台,已针对 Microsoft Azure 云服务平台进行优化。 Databricks 用于关联出租车行程数据和费用数据,以及使用 Databricks 文件系统中存储的周边区域数据丰富关联的数据。

Azure Cosmos DB。 Azure Databricks 作业的输出是写入到 Azure Cosmos DB for Apache Cassandra 的一系列记录。 使用 Azure Cosmos DB for Apache Cassandra 是因为它支持时序数据建模。

  • Azure Synapse Link for Azure Cosmos DB,可以使用 Azure Synapse 工作区中提供的两个分析引擎 (SQL 无服务器Spark 池)对 Azure Cosmos DB 中的操作数据运行近实时分析,“而不会影响事务工作负荷的性能或成本”

Azure Log AnalyticsAzure Monitor 收集的应用程序日志数据存储在 Log Analytics 工作区中。 可以使用 Log Analytics 来分析和可视化指标,以及检查日志消息以确定应用程序中的问题。

备选方法

  • Synapse Link 是 Microsoft 首选的基于 Azure Cosmos DB 数据的分析解决方案。

方案详细信息

场景:某家出租车公司需要收集有关出租车的每次行程的数据。 对于此场景,我们假设有两个不同的设备在发送数据。 出租车的计量表发送有关每次行程的信息:持续时间、距离以及上车和下车地点。 有一个单独的设备接受客户的付款,并发送有关费用的数据。 为了绘制行程趋势图,该出租车公司想要实时计算每个周边区域的每英里平均小费。

可能的用例

此解决方案针对零售行业进行了优化。

数据引入

为了模拟数据源,此参考体系结构使用了纽约市出租车数据数据集[1]。 此数据集包含纽约市过去四年(2010 年 - 2013 年)的出租车行程数据。 其包含两种类型的记录:行程数据和费用数据。 行程数据包括行程持续时间、行程距离以及上车和下车地点。 费用数据包括乘车费、税费和小费金额。 这两种记录类型中的通用字段包括牌照号、出租车驾照和供应商 ID。 这三个字段相结合,唯一标识了出租车和驾驶员。 数据以 CSV 格式存储。

[1] Donovan, Brian;Work, Dan (2016):纽约市出租车行程数据 (2010-2013)。 伊利诺伊大学厄巴纳-香槟分校。 https://doi.org/10.13012/J8PN93H8

数据生成器是一个读取记录并将其发送到 Azure 事件中心的 .NET Core 应用程序。 该生成器发送 JSON 格式的行程数据以及 CSV 格式的费用数据。

事件中心使用分区将数据分段。 使用者可以通过分区功能并行读取每个分区。 将数据发送到事件中心时,可以显式指定分区键。 否则,记录将以循环方式分配到分区。

在此场景中,给定出租车的行程数据和费用数据最终会获得相同的分区 ID。 这样,在关联两个流时,Databricks 可以应用某种并行度。 行程数据分区 n 中的记录将与费用数据分区 n 中的记录进行匹配。

Diagram of stream processing with Azure Databricks and Event Hubs.

下载此体系结构的 Visio 文件

在数据生成器中,这两种记录类型的通用数据模型具有 PartitionKey 属性,该属性是 MedallionHackLicenseVendorId 的串联形式。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

将数据发送到事件中心时,会使用此属性来提供显式分区键:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

事件中心

事件中心的吞吐量容量以吞吐量单位来度量。 可以通过启用自动扩充来自动缩放事件中心。自动扩充可以根据流量,最高将吞吐量单位自动扩展到配置的上限。

流处理

在 Azure Databricks 中,数据处理由某个作业执行。 该作业分配到群集并在其上运行。 作业可以是以 Java 编写的自定义代码,或 Spark 笔记本

在本参考体系结构中,作业是一个 Java 存档,其中包含以 Java 和 Scala 编写的类。 为 Databricks 作业指定 Java 存档时,将指定该类,让 Databricks 群集执行。 此处,com.microsoft.pnp.TaxiCabReader 类的 main 方法包含数据处理逻辑。

从两个事件中心实例读取流

数据处理逻辑使用 Spark 结构化流从两个 Azure 事件中心实例读取数据:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

使用周边区域信息丰富数据

行程数据包括上车和下车地点的纬度与经度坐标。 尽管这些坐标很有用,但不可轻松用于分析。 因此,已使用从形状文件读取的周边区域数据丰富了这些数据。

形状文件采用二进制格式且不可轻松分析,但 GeoTools 库提供了相应的工具用于处理使用形状文件格式的地理空间数据。 com.microsoft.pnp.GeoFinder 类中使用了此库来根据上车和下车地点坐标确定周边区域名称。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

联接行程数据和费用数据

首先转换行程数据和费用数据:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

然后将行程数据与费用数据相联接:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

处理数据并将数据插入到 Azure Cosmos DB 中

按给定的时间间隔计算每个周边区域的平均费用金额:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

然后将此金额插入到 Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

注意事项

这些注意事项实施 Azure 架构良好的框架的支柱原则,即一套可用于改善工作负载质量的指导原则。 有关详细信息,请参阅 Microsoft Azure 架构良好的框架

安全性

安全性针对蓄意攻击及滥用宝贵数据和系统提供保障措施。 有关详细信息,请参阅安全性支柱概述

使用管理员控制台控制对 Azure Databricks 工作区的访问。 管理员控制台包含添加用户、管理用户权限和设置单一登录的功能。 还可以通过管理员控制台设置工作区、群集、作业和表的访问控制。

管理机密

Azure Databricks 包含一个用于存储机密(包括连接字符串、访问密钥、用户名和密码)的机密存储。 Azure Databricks 机密存储中的机密按范围分区:

databricks secrets create-scope --scope "azure-databricks-job"

机密是在范围级别添加的:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

注意

可以使用基于 Azure Key Vault 的范围来取代本机 Azure Databricks 范围。 有关详细信息,请参阅基于 Azure Key Vault 的范围

在代码中,可以通过 Azure Databricks 机密实用工具访问机密。

监视

Azure Databricks 基于 Apache Spark,两者都使用 log4j 作为日志记录的标准库。 除了 Apache Spark 提供的默认日志记录之外,还可以按照监视 Azure Databricks 一文实现对 Azure Log Analytics 的日志记录。

com.microsoft.pnp.TaxiCabReader 类将处理行程和费用消息,如果其中一种消息的格式不当,则该消息无效。 在生产环境中,必须分析这些格式不当的消息来识别数据源的问题,以便可以快速解决问题并防止数据丢失。 com.microsoft.pnp.TaxiCabReader 类注册 Apache Spark Accumulator,该程序可以跟踪格式不当的费用记录和行程记录的数目:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark 使用 Dropwizard 库发送指标;某些本机 Dropwizard 指标字段与 Azure Log Analytics 不兼容。 因此,本参考体系结构包含了自定义的 Dropwizard 接收器和报告器。 它会根据 Azure Log Analytics 的预期设置指标格式。 当 Apache Spark 报告指标时,也会发送格式不当的行程数据和费用数据的自定义指标。

下面是可在 Azure Log Analytics 工作区中使用的示例查询,用于监视流式处理作业的执行。 每个查询中的参数 ago(1d) 将返回过去一天生成的所有记录,并可以进行调整以查看不同的时间段。

流查询执行期间记录的异常

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

格式不当的费用和行程数据的累积数目

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

随时间推移的作业执行

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

有关详细信息,请参阅监视 Azure Databricks

DevOps

  • 为生产、开发和测试环境创建单独的资源组。 使用单独的资源组可以更方便地管理部署、删除测试部署,以及分配访问权限。

  • 按照基础结构即代码 (IaC) 流程,使用 Azure 资源管理器模板部署 Azure 资源。 通过模板,可更轻松地使用 Azure DevOps Services 或其他 CI/CD 解决方案自动执行部署。

  • 将每个工作负载放在单独的部署模板中,并将资源存储在源代码管理系统中。 可以在 CI/CD 过程中统一或者逐个部署这些模板,以简化自动化流程。

    在此体系结构中,Azure 事件中心、Log Analytics 和 Azure Cosmos DB 都标识为单个工作负载。 这些资源包含在单个 ARM 模板中。

  • 请考虑暂存工作负载。 部署到各个阶段,并在每个阶段运行验证检查,然后再移动到下一阶段。 这样便能以高度受控的方式将更新推送到生产环境,并最大程度地减少意外的部署问题。

    在此体系结构中,有多个部署阶段。 请考虑创建 Azure DevOps 管道并添加这些阶段。 下面是可自动执行的一些阶段示例:

    • 启动 Databricks 群集
    • 配置 Databricks CLI
    • 安装 Scala 工具
    • 添加 Databricks 机密

    此外,请考虑编写自动化集成测试,以提高 Databricks 代码及其生命周期的质量和可靠性。

  • 请考虑使用 Azure Monitor 分析流处理管道的性能。 有关详细信息,请参阅监视 Azure Databricks

有关详细信息,请参阅 Microsoft Azure 架构良好的框架中的“DevOps”部分。

成本优化

成本优化是关于寻找减少不必要的费用和提高运营效率的方法。 有关详细信息,请参阅成本优化支柱概述

使用 Azure 定价计算器估算成本。 下面是此参考体系结构中使用的服务的一些注意事项。

事件中心

此参考体系结构在标准层中部署事件中心。 定价模型基于吞吐量单位、入口事件和捕获事件。 入口事件为小于等于 64 KB 的数据单位。 较大的消息按 64 KB 的倍数计费。 可以通过 Azure 门户或事件中心管理 API 指定吞吐量单位。

如果需要更多保留天数,请考虑使用专用层。 此层提供具有最苛刻要求的单租户部署。 此产品/服务会基于容量单位 (CU) 构建群集,而不受吞吐量单位的限制。

标准层也根据入口事件和吞吐量单位计费。

有关事件中心定价的信息,请参阅事件中心定价

Azure Databricks

Azure Databricks 提供两个层(标准层和高级层),每个层都支持三个工作负载。 此参考体系结构在高级层中部署 Azure Databricks 工作区。

数据工程和轻量数据工程工作负载供数据工程师生成和执行作业。 数据分析工作负载旨在让数据科学家以交互方式浏览、可视化、操作和共享数据和见解。

Azure Databricks 提供了许多定价模型。

  • 即用即付计划

    根据所选 VM 实例,对群集中预配的虚拟机 (VM) 和 Databricks 单位 (DBU) 进行计费。 DBU 是处理能力的单位,按每秒使用量计费。 DBU 使用量取决于运行 Azure Databricks 的实例的大小和类型。 定价将取决于所选工作负载和层。

  • 预购计划

    以 Databricks 提交单位 (DBCU) 的形式预购 Azure Databricks 单位 (DBU) 1 或 3 年。 与即用即付模型相比,最多可节省 37% 的费用。

有关详细信息,请参阅定价 Azure Databricks 定价

Azure Cosmos DB

在此体系结构中,Azure Databricks 作业将一系列记录写入 Azure Cosmos DB。 对用于执行插入操作的预留容量(RU/秒)收费。 计费单位为每小时 100 RU/秒。 例如,写入 100 KB 项的成本为 50 RU/秒。

对于写入操作,预配足够的容量以支持每秒所需的写入次数。 可在执行写入操作之前使用门户或 Azure CLI 来增加预配置的吞吐量,然后在操作完成后降低吞吐量。 写入期间的吞吐量是给定数据所需的最小吞吐量,加上插入操作所需的吞吐量(假设没有其他工作负载正在运行)。

成本分析示例

假设在容器上配置了 1,000 RU/秒的吞吐量。 它部署了 30 天,每天 24 小时,总共 720 小时。

该容器每小时按 10 个单位 100 RU/秒计费。 每 100 RU/秒每小时 0.008 美元,每小时 10 个单位,共收取 0.08 美元。

对于 720 小时或 7,200 个单位 (100 RU),每月账单为 57.60 美元。

存储费用按存储数据和索引所用的每 GB 计算。 有关详细信息,请参阅 Azure Cosmos DB 定价模型

使用 Azure Cosmos DB 容量计算器快速估算工作负载成本。

有关详细信息,请参阅 Microsoft Azure 架构良好的框架中的“成本”部分。

部署此方案

若要部署并运行参考实现,请按 GitHub 自述文件中的步骤操作。

后续步骤