使用 Azure Databricks 進行串流處理

Azure Cosmos DB
Azure Databricks
Azure 事件中樞
Azure Log Analytics
Azure 監視器

此參考架構會顯示端對端串流處理管線。 這種類型的管線有四個階段:內嵌、進程、存放區及分析和報告。 在此參考架構中,管線會擷取來自兩個來源的數據、在每個數據流的相關記錄上執行聯結、擴充結果,以及實時計算平均值。 結果會儲存以供進一步分析。

GitHub logoGitHub提供此架構的參考實作。

架構

Diagram showing a reference architecture for stream processing with Azure Databricks.

下載此架構的 Visio 檔案

工作流程

架構包含下列元件:

數據源。 在此架構中,有兩個數據源會即時產生數據流。 第一個數據流包含車程資訊,第二個數據流包含車資資訊。 參考架構包含仿真的數據產生器,可從一組靜態檔案讀取,並將數據推送至事件中樞。 實際應用程式中的數據源會是計程車上安裝的裝置。

Azure 事件中樞。 事件中 樞是事件擷取服務。 此架構會使用兩個事件中樞實例,每個數據源各一個。 每個數據源都會將數據流傳送至相關聯的事件中樞。

Azure Databricks。 Databricks 是針對 Microsoft Azure 雲端服務平台優化的 Apache Spark 型分析平臺。 Databricks 可用來將計程車車程和車資數據相互關聯,以及擴充與 Databricks 文件系統中儲存的鄰里數據相互關聯數據。

Azure Cosmos DB。 Azure Databricks 作業的輸出是一系列記錄,這些記錄會 寫入 Azure Cosmos DB for Apache Cassandra。 使用適用於 Apache Cassandra 的 Azure Cosmos DB,因為它支援時間序列數據模型化。

  • 適用於 Azure Cosmos DB 的 Azure Synapse Link 可讓您使用 Azure Synapse 工作區提供的兩個分析引擎:SQL 無伺服器Spark 集區,對交易式工作負載執行近乎即時的分析,而不需要對交易式工作負載產生任何效能或成本影響。

Azure Log Analytics。 Azure 監視器所收集的應用程式記錄資料會儲存在 Log Analytics 工作區中。 Log Analytics 查詢可用來分析和可視化計量,並檢查記錄訊息以識別應用程式內的問題。

替代項目

  • Synapse Link 是 Microsoft 慣用的解決方案,可在 Azure Cosmos DB 數據上進行分析。

案例詳細資料

案例:計程車公司會收集每個計程車車程的相關數據。 在此案例中,我們假設有兩個不同的裝置傳送數據。 計程車有一個計量,可傳送每個車程的相關信息 -- 持續時間、距離和上下車地點。 個別裝置會接受客戶的付款,並傳送有關票價的數據。 為了找出騎車趨勢,計程車公司想要實時計算每個街區每英里行駛的平均小費。

潛在的使用案例

此解決方案已針對零售產業優化。

資料擷取

為了模擬數據源,此參考架構會使用 紐約市計程車數據 數據集[1]。 此數據集包含紐約市在四年內的計程車車程數據(2010 - 2013 年)。 其中包含兩種類型的記錄:車程數據和票價數據。 車程數據報含車程持續時間、車程距離和上車和下車位置。 票價數據報括票價、稅金和小費金額。 這兩種記錄類型的常見欄位包括獎章號碼、駭客授權和廠商標識碼。 這三個字段會唯一識別計程車加上司機。 數據會以 CSV 格式儲存。

[1] 多諾萬, 布賴恩;Work, Dan (2016): 紐約市計程車車程數據 (2010-2013 年)。 伊利諾伊大學厄巴-尚佩恩大學。 https://doi.org/10.13012/J8PN93H8

數據產生器是 .NET Core 應用程式,可讀取記錄,並將其傳送至 Azure 事件中樞。 產生器會以 JSON 格式傳送車程數據,並以 CSV 格式傳送車資數據。

事件中樞會使用 分割 區來分割數據。 數據分割可讓取用者平行讀取每個分割區。 當您將數據傳送至事件中樞時,您可以明確指定分割區索引鍵。 否則,記錄會以迴圈配置資源的方式指派給分割區。

在此案例中,車程數據和車資數據最後應該會包含指定計程車的相同分割區標識符。 這可讓 Databricks 在將兩個數據流相互關聯時套用平行處理原則的程度。 車程數據分割 n 中的記錄會比對費用數據分割 n 中的記錄。

Diagram of stream processing with Azure Databricks and Event Hubs.

下載此架構的 Visio 檔案

在數據產生器中,這兩種記錄類型的通用數據模型都有 屬性PartitionKey,其為 、 HackLicenseVendorIdMedallion串連。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

當傳送至事件中樞時,這個屬性可用來提供明確的分割區索引鍵:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

事件中樞

事件中樞的輸送量容量是以輸送量單位來測量。 您可以藉由啟用 自動擴充來自動調整事件中樞,以根據流量自動調整輸送量單位,最多可達到設定的最大值。

串流處理

在 Azure Databricks 中,數據處理是由作業執行。 作業會指派給叢集並在叢集上執行。 作業可以是以 Java 或 Spark 筆記本撰寫的自定義程式代碼。

在此參考架構中,作業是 Java 封存,其中包含以 Java 和 Scala 撰寫的類別。 指定 Databricks 作業的 Java 封存時,會指定 類別以供 Databricks 叢集執行。 在這裡,com.microsoft.pnp.TaxiCabReader 類別的主要方法包含數據處理邏輯。

從兩個事件中樞實例讀取數據流

數據處理邏輯會使用 Spark 結構化串流 從兩個 Azure 事件中樞實例讀取:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

使用鄰里資訊擴充數據

車程數據報含上下車位置的緯度和經度座標。 雖然這些座標很實用,但不容易用於分析。 因此,此數據會以從shapefile讀取的鄰里數據擴充。

shapefile 格式是二進位格式,而且不容易剖析,但 GeoTools 連結庫會提供使用 shapefile 格式的地理空間數據工具。 此連結庫會用於 com.microsoft.pnp.GeoFinder 類別,以根據取貨和下車座標來判斷鄰里名稱。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

聯結車程和票價數據

首先會轉換車程和票價數據:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

然後,車程數據會與票價數據聯結:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

處理數據並插入 Azure Cosmos DB

每個鄰里的平均票價金額會針對指定的時間間隔計算:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

接著會插入 Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

考量

這些考慮會實作 Azure Well-Architected Framework 的支柱,這是一組指導原則,可用來改善工作負載的品質。 如需詳細資訊,請參閱 Microsoft Azure Well-Architected Framework

安全性

安全性可提供針對蓄意攻擊和濫用寶貴數據和系統的保證。 如需詳細資訊,請參閱 安全性要素概觀。

使用系統管理員控制台控制 Azure Databricks 工作區的存取權。 系統管理員主控台包含新增使用者、管理用戶許可權,以及設定單一登錄的功能。 工作區、叢集、作業和數據表的訪問控制也可以透過系統管理員主控台進行設定。

管理秘密

Azure Databricks 包含用來儲存秘密的秘密存放區,包括 連接字串、存取密鑰、使用者名稱和密碼。 Azure Databricks 秘密存放區內的秘密會依範圍進行分割:

databricks secrets create-scope --scope "azure-databricks-job"

秘密會在範圍層級新增:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

注意

您可以使用 Azure 金鑰保存庫 支援的範圍,而不是原生 Azure Databricks 範圍。 若要深入瞭解,請參閱 Azure 金鑰保存庫 支援的範圍

在程序代碼中,秘密是透過 Azure Databricks 秘密公用程式來存取。

監視

Azure Databricks 是以 Apache Spark 為基礎,兩者都使用 log4j 作為記錄的標準連結庫。 除了 Apache Spark 所提供的預設記錄之外,您還可以遵循監視 Azure Databricks 一文 ,將記錄實作至 Azure Log Analytics

由於 com.microsoft.pnp.TaxiCabReader 類別會處理車程和車資訊息,因此可能其中一個可能格式不正確,因此無效。 在生產環境中,請務必分析這些格式不正確的訊息,以找出數據源的問題,以便快速修正,以防止數據遺失。 com.microsoft.pnp.TaxiCabReader 類別會註冊 Apache Spark 累積器,以追蹤格式不正確的票價和車程記錄數目:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark 會使用 Dropwizard 連結庫來傳送計量,而某些原生 Dropwizard 計量字段與 Azure Log Analytics 不相容。 因此,此參考架構包含自定義Dropwizard接收和記者。 它會以 Azure Log Analytics 預期的格式格式化計量。 當 Apache Spark 報告計量時,也會傳送格式錯誤的車程和票價數據的自定義計量。

以下是您可以在 Azure Log Analytics 工作區中用來監視串流作業執行的範例查詢。 每個查詢中的 自變數 ago(1d) 都會傳回過去一天產生的所有記錄,並可調整以檢視不同的時間週期。

數據流查詢執行期間記錄的例外狀況

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

格式錯誤的票價和車程數據的累積

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

一段時間的作業執行

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

如需詳細資訊,請參閱 監視 Azure Databricks

DevOps

  • 為生產環境、開發和測試環境建立個別的資源群組。 不同的資源群組可讓您更輕鬆地管理部署、刪除測試部署,以及指派訪問許可權。

  • 使用 Azure Resource Manager 範本 ,在基礎結構即程式代碼 (IaC) 程序之後部署 Azure 資源。 透過範本,使用 Azure DevOps Services 或其他 CI/CD 解決方案將部署自動化會比較容易。

  • 將每個工作負載放在個別的部署範本中,並將資源儲存在原始檔控制系統中。 您可以將範本一起或個別部署為 CI/CD 程式的一部分,讓自動化程式更容易。

    在此架構中,Azure 事件中樞、Log Analytics 和 Azure Cosmos DB 會識別為單一工作負載。 這些資源包含在單一 ARM 範本中。

  • 請考慮暫存工作負載。 部署至各種階段,並在每個階段執行驗證檢查,再移至下一個階段。 如此一來,您就可以以高度控制的方式將更新推送至生產環境,並將未預期的部署問題降到最低。

    在此架構中,有多個部署階段。 請考慮建立 Azure DevOps Pipeline 並新增這些階段。 以下是一些您可以自動化的階段範例:

    • 啟動 Databricks 叢集
    • 設定 Databricks CLI
    • 安裝 Scala 工具
    • 新增 Databricks 秘密

    此外,請考慮撰寫自動化整合測試,以改善 Databricks 程式代碼及其生命週期的品質和可靠性。

  • 請考慮使用 Azure 監視器 來分析串流處理管線的效能。 如需詳細資訊,請參閱 監視 Azure Databricks

如需詳細資訊,請參閱 Microsoft Azure Well-Architected Framework 中的 DevOps 一節。

成本最佳化

成本優化是考慮如何減少不必要的費用,並提升營運效率。 如需詳細資訊,請參閱 成本優化要素概觀。

使用 Azure 定價計算機來預估成本。 以下是此參考架構中使用的服務一些考慮。

事件中樞

此參考架構會在標準層中部署事件中樞。 定價模式是以輸送量單位、輸入事件和擷取事件為基礎。 「輸入事件」是 64 KB 或更小的資料單位。 較大訊息以 64 KB 的倍數計費。 您可以透過 Azure 入口網站 或事件中樞管理 API 來指定輸送量單位。

如果您需要更多保留天數,請考慮 專用 層。 此層提供需求最苛刻的單一租使用者部署。 此供應專案會根據未受輸送量單位約束的容量單位 (CU) 建置叢集。

標準層也會根據輸入事件和輸送量單位來計費。

如需事件中樞定價的相關信息,請參閱事件中 樞定價

Azure Databricks

Azure Databricks 提供兩層 Standard進階版,每個都支援三個工作負載。 此參考架構會在 進階版 層中部署 Azure Databricks 工作區。

資料工程師 和 資料工程師 Light 工作負載是讓數據工程師建置和執行作業。 數據分析工作負載適用於數據科學家,以互動方式探索、可視化、操作及共用數據和深入解析。

Azure Databricks 提供許多定價模式。

  • 隨用隨付方案

    系統會根據所選取的 VM 實例,針對叢集中佈建的虛擬機和 Databricks Units (DBU)支付費用。 DBU 是處理能力的單位,以每秒使用量計費。 DBU 耗用量取決於執行 Azure Databricks 的執行個體大小和類型。 定價將取決於選取的工作負載和層級。

  • 預先購買方案

    您認可 Azure Databricks Units (DBU) 作為 Databricks 認可單位 (DBCU) 一或三年。 相較於隨用隨付模型,最多可節省 37%。

如需詳細資訊,請參閱 Azure Databricks 定價

Azure Cosmos DB

在此架構中,Azure Databricks 作業會將一系列記錄寫入 Azure Cosmos DB。 系統會向您保留的容量收費,以每秒要求單位 (RU/秒) 表示,用來執行插入作業。 計費單位為每小時 100 RU/秒。 例如,撰寫 100 KB 專案的成本為 50 RU/秒。

針對寫入作業,布建足夠的容量,以支援每秒所需的寫入數目。 您可以先使用入口網站或 Azure CLI 來增加布建的輸送量,再執行寫入作業,然後在完成這些作業之後減少輸送量。 寫入期間的輸送量是指定數據所需的最小輸送量,加上插入作業所需的輸送量,假設沒有其他工作負載正在執行。

範例成本分析

假設您在容器上設定輸送量值為 1,000 RU/秒。 它已部署 24 小時 30 天,總共 720 小時。

容器每小時會以每小時 100 RU/秒的單位計費。 每小時 10 個單位為 $0.008(每 100 RU/秒)收取每小時 $0.08 美元。

對於 720 小時或 7,200 個單位(100 RU),您每月的帳單為 57.60 美元。

儲存體 也會針對儲存數據和索引所使用的每個 GB 計費。 如需詳細資訊,請參閱 Azure Cosmos DB 定價模型

使用 Azure Cosmos DB 容量計算機來快速估計工作負載成本。

如需詳細資訊,請參閱 Microsoft Azure Well-Architected Framework 中的成本一節。

部署此案例

若要部署並執行參考實作,請遵循 GitHub 自述檔中的步驟。

下一步