Azure Databricks によるストリーム処理

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

この参照アーキテクチャでは、エンド ツー エンドのストリーム処理パイプラインを示します。 この種類のパイプラインには、取り込み、処理、格納、および分析とレポート作成の 4 つの段階があります。 この参照アーキテクチャでは、パイプラインは、2 つのソースからデータを取り込み、各ストリームの関連するレコードに対して結合を実行し、結果を強化させ、リアルタイムで平均を計算します。 結果が保存され、さらに詳しい分析が行われます。

GitHub logoこのアーキテクチャの参照実装は、GitHub で入手できます。

アーキテクチャ

Diagram showing a reference architecture for stream processing with Azure Databricks.

このアーキテクチャの Visio ファイルをダウンロードします。

ワークフロー

アーキテクチャは、次のコンポーネントで構成されています。

データ ソース。 このアーキテクチャには、リアルタイムでデータ ストリームを生成する 2 つのデータ ソースがあります。 1 つ目のストリームには乗車情報が含まれ、2 つ目のストリームには料金情報が含まれます。 参照アーキテクチャには、一連の静的ファイルから読み取り、データを Event Hubs にプッシュするシミュレートされたデータ ジェネレーターが含まれています。 実際のアプリケーションのデータ ソースは、タクシーに設置されたデバイスになります。

Azure Event HubsEvent Hubs はイベント取り込みサービスです。 このアーキテクチャでは、2 つのイベント ハブ インスタンス (データ ソースごとに 1 つ) を使用します。 各データ ソースは、関連付けられたイベント ハブにデータ ストリームを送信します。

Azure DatabricksDatabricks は、Microsoft Azure クラウド サービス プラットフォーム用に最適化された Apache Spark ベースの分析プラットフォームです。 Databricks を使用して、タクシーの乗車データと料金データを関連付け、さらに、その関連付けられたデータを、Databricks ファイル システムに格納されている地域データで強化します。

Azure Cosmos DB Azure Databricks ジョブの出力は一連のレコードであり、Azure Cosmos DB for Apache Cassandra に書き込まれます。 Azure Cosmos DB for Apache Cassandra が使用されるのは、時系列データ モデリングをサポートしているためです。

  • Azure Synapse Link for Azure Cosmos DB を使用すると、Azure Synapse ワークスペースから利用できる SQL Serverless および Spark Pools の 2 つの分析エンジンを使用して、トランザクション ワークロードにパフォーマンスまたはコストの影響を与えることなく、Azure Cosmos DB のオペレーショナル データに対して凖リアルタイムの分析を実行できます。

Azure Log AnalyticsAzure Monitor で収集されたアプリケーション ログ データは、Log Analytics ワークスペースに保存されます。 Log Analytics クエリを使用してメトリックを分析および視覚化し、ログ メッセージを検査してアプリケーション内の問題を特定できます。

代替

  • Synapse Link は、Azure Cosmos DB データに対して分析を行うための Microsoft の推奨ソリューションです。

シナリオの詳細

シナリオ:タクシー会社が各乗車に関するデータを収集しています。 このシナリオでは、データを送信する 2 つのデバイスがあることを想定しています。 タクシーには、各乗車の情報 (走行時間、距離、乗車場所と降車場所) を送信するメーターがあります。 別のデバイスでは、乗客からの支払いを受け付け、料金に関するデータを送信します。 利用者数の傾向をつかむために、このタクシー会社では、各地で走行 1 マイルあたりの平均チップをリアルタイムで計算したいと考えています。

考えられるユース ケース

このソリューションは、小売業界向けに最適化されています。

データ インジェスト

データ ソースをシミュレートするために、この参照アーキテクチャでは New York City Taxi Data データセット[1] を使用します。 このデータセットには、ニューヨーク市の 4 年間 (2010 年から 2013 年) のタクシー乗車に関するデータが含まれています。 乗車データと料金データの 2 種類のレコードがあります。 乗車データには、走行時間、乗車距離、乗車場所と降車場所が含まれます。 料金データには、料金、税、チップの金額が含まれます。 この 2 種類のレコードの共通フィールドには、営業許可番号、タクシー免許、ベンダー ID があります。 この 3 つのフィールドを組み合わせて、タクシーと運転手が一意に識別されます。 データは CSV 形式で保存されます。

[1] Donovan, Brian; Work, Dan (2016):New York City Taxi Trip Data (2010-2013). イリノイ大学アーバナシャンペーン校。 https://doi.org/10.13012/J8PN93H8

データ ジェネレーターは、レコードを読み取り、Azure Event Hubs に送信する .NET Core アプリケーションです。 ジェネレーターは、JSON 形式の乗車データと CSV 形式の料金データを送信します。

Event Hubs では、パーティションを使用してデータをセグメント化します。 複数のパーティションでは、コンシューマーは各パーティションを並列で読み取ることができます。 Event Hubs にデータを送信するときに、パーティション キーを明示的に指定できます。 それ以外の場合は、ラウンド ロビン方式でパーティションにレコードが割り当てられます。

このシナリオでは、特定のタクシーの乗車データと料金データは、最終的に同じパーティション ID を共有します。 これにより、Databricks は 2 つのストリームを関連付けるときに、ある程度の並列処理を適用できます。 乗車データのパーティション n 内のレコードは、料金データのパーティション n 内のレコードに対応します。

Diagram of stream processing with Azure Databricks and Event Hubs.

このアーキテクチャの Visio ファイルをダウンロードします。

データ ジェネレーターでは、両方のレコードの種類に対応した共通データ モデルに、MedallionHackLicenseVendorId を連結した PartitionKey プロパティがあります。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

このプロパティを使用して、Event Hubs への送信時に明示的なパーティション キーが提供されます。

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Event Hubs のスループット容量は、スループット ユニットで測定されます。 自動インフレを有効にすると、イベント ハブを自動スケーリングできます。自動インフレでは、トラフィックに基づいて、スループット ユニットが構成済みの最大値まで自動的にスケーリングされます。

ストリーム処理

Azure Databricks では、ジョブによってデータ処理が実行されます。 ジョブはクラスターに割り当てられ、そこで実行されます。 ジョブは、Java で記述されたカスタム コードか、Spark Notebook です。

この参照アーキテクチャでは、ジョブは、Java と Scala の両方で記述されたクラスを含む Java アーカイブです。 Databricks ジョブの Java アーカイブを指定する場合、実行対象のクラスは Databricks クラスターによって指定されます。 ここでは、com.microsoft.pnp.TaxiCabReader クラスの main メソッドに、データ処理ロジックが含まれています。

2 つのイベント ハブ インスタンスからのストリームの読み取り

データ処理ロジックでは、2 つの Azure イベント ハブ インスタンスからの読み取りに、Spark 構造化ストリーミングが使用されます。

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

地域情報によるデータの強化

乗車データには、乗車場所と降車場所の緯度と経度の座標が含まれています。 これらの座標は便利ですが、分析で使用するのは簡単ではありません。 したがって、このデータは、シェープファイルから読み取られる地域データで強化されます。

シェープファイルはバイナリ形式で、簡単には解析されませんが、GeoTools ライブラリには、シェープファイル形式を使用する地理空間データを対象としたツールが用意されています。 このライブラリは、乗車場所と降車場所の座標に基づいて地域名を判断するために、com.microsoft.pnp.GeoFinder クラスで使用されます。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

乗車データと料金データの結合

最初に、乗車データと料金データが変換されます。

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

次に、乗車データが、料金データと結合されます。

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

データの処理と Azure Cosmos DB への挿入

指定された期間について、平均料金が地域ごとに計算されます。

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

次に、これが Azure Cosmos DB に挿入されます。

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

考慮事項

以降の考慮事項には、ワークロードの品質向上に使用できる一連の基本原則である Azure "Well-Architected Framework" の要素が組み込まれています。 詳細については、「Microsoft Azure Well-Architected Framework」を参照してください。

セキュリティ

セキュリティは、重要なデータやシステムの意図的な攻撃や悪用に対する保証を提供します。 詳細については、「セキュリティの重要な要素の概要」を参照してください。

Azure Databricks ワークスペースへのアクセスは、管理者コンソールを使用して制御されます。 管理者コンソールには、ユーザーを追加する、ユーザーのアクセス許可を管理する、シングル サインオンを設定する、といった機能が含まれています。 ワークスペース、クラスター、ジョブ、およびテーブルのアクセス制御も、管理者コンソールで設定できます。

シークレットを管理する

Azure Databricks にはシークレット ストアがあり、これを使用して接続文字列、アクセス キー、ユーザー名、パスワードなどのシークレットを格納します。 Azure Databricks シークレット ストア内のシークレットは、スコープごとにパーティション分割されています。

databricks secrets create-scope --scope "azure-databricks-job"

シークレットは、スコープ レベルで追加されます。

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

注意

ネイティブの Azure Databricks スコープではなく、Azure Key Vault を実体とするスコープを使用できます。 詳細については、「Azure Key Vault-backed scopes (Azure Key Vault を実体とするスコープ)」を参照してください。

コードでは、Azure Databricks シークレット ユーティリティを介してシークレットにアクセスします。

監視

Azure Databricks は Apache Spark に基づいており、どちらも、ログ記録用の標準ライブラリとして log4j を使用しています。 Apache Spark によって提供される既定のログ記録に加えて、Azure Log Analytics へのログ記録を実装できます。詳細については、「Azure Databricks の監視」を参照してください。

com.microsoft.pnp.TaxiCabReader クラスでは、乗車メッセージと料金メッセージが処理されるため、いずれかの形式が正しくない可能性があり、これが原因で無効になることがあります。 運用環境では、形式に誤りがあるこれらのメッセージを分析して、データ ソースの問題を特定することが重要です。これにより、その問題を迅速に修正して、データ損失を防ぐことができます。 com.microsoft.pnp.TaxiCabReader クラスにより、形式に誤りがある料金レコードと乗車レコードの数を追跡する Apache Spark アキュムレータが登録されます。

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark は Dropwizard ライブラリを使用してメトリックを送信しますが、ネイティブ Dropwizard メトリック フィールドの中には、Azure Log Analytics と互換性がないものがあります。 このため、この参照アーキテクチャには、カスタム Dropwizard シンクおよびレポーターが含まれています。 これにより、メトリックは、Azure Log Analytics で予期される形式に書式設定されます。 Apache Spark によってメトリックがレポートされると、形式に誤りがある乗車データと料金データに対するカスタム メトリックも送信されます。

Azure Log Analytics ワークスペースでストリーミング ジョブの実行を監視するために使用できるクエリの例を次に示します。 各クエリの ago(1d) 引数は、最後の日に生成されたレコードをすべて返し、異なる期間を表示するために調整できます。

ストリーム クエリの実行中に記録された例外

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

形式に誤りがある料金データと乗車データの蓄積

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

経時的なジョブの実行

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

詳細については、「Azure Databricks の監視」を参照してください。

DevOps

  • 運用、開発、およびテスト環境それぞれに対して個別のリソース グループを作成してください。 個別のリソース グループにより、デプロイの管理、テスト デプロイの削除、およびアクセス権の割り当てが行いやすくなります。

  • Azure Resource Manager テンプレートを使用して、コードとしてのインフラストラクチャ (IaC) プロセスに従って Azure リソースをデプロイします。 テンプレートを使用すると、Azure DevOps Services またはその他の CI/CD ソリューションを使用したデプロイの自動化が簡単になります。

  • 各ワークロードを別々のデプロイ テンプレートに配置し、リソースをソース管理システムに格納します。 テンプレートは一緒にデプロイすることも、CI/CD プロセスの一環として個別にデプロイすることもできるため、自動化プロセスが簡単になります。

    このアーキテクチャでは、Azure Event Hubs、Log Analytics、および Azure Cosmos DB が 1 つのワークロードとして識別されます。 これらのリソースは、1 つの ARM テンプレートに含まれます。

  • ワークロードをステージングすることを検討してください。 さまざまなステージにデプロイし、各ステージで検証チェックを実行してから、次のステージに進みます。 これにより、高度に制御された方法で運用環境に更新プログラムをプッシュし、予期しないデプロイの問題を最小限に抑えることができます。

    このアーキテクチャには、複数のデプロイ ステージがあります。 Azure DevOps パイプラインを作成し、それらのステージを追加することを検討してください。 自動化できるステージの例を、次にいくつか示します。

    • Databricks クラスターの開始
    • Databricks CLI の構成
    • Scala ツールのインストール
    • Databricks シークレットの追加

    また、Databricks コードとそのライフ サイクルの品質と信頼性を向上させるために、自動化された統合テストを作成することを検討してください。

  • ストリーム処理パイプラインのパフォーマンスを分析するには、Azure Monitor の使用を検討してください。 詳細については、「Azure Databricks の監視」を参照してください。

詳細については、「Microsoft Azure Well-Architected Framework」の DevOps のセクションを参照してください。

コストの最適化

コストの最適化とは、不要な費用を削減し、運用効率を向上させる方法を検討することです。 詳しくは、コスト最適化の柱の概要に関する記事をご覧ください。

コストの見積もりには、Azure 料金計算ツールをご利用ください。 ここでは、この参照アーキテクチャで使用されるサービスに関するいくつかの考慮事項を示します。

Event Hubs

この参照アーキテクチャでは、Standard レベルで Event Hubs がデプロイされます。 価格モデルは、スループット ユニット、イングレス イベント、およびキャプチャ イベントに基づいています。 イングレス イベントは、64 KB 以下のデータの単位です。 よりサイズが大きいメッセージには、64 KB の倍数で課金されます。 スループット ユニットは、Azure portal または Event Hubs の管理 API のどちらかを使用して、指定します。

リテンション期間を延長する必要がある場合は、Dedicated レベルを検討してください。 このレベルでは、最も要求の厳しい要件でのシングルテナント デプロイを提供します。 このオファリングでは、スループット ユニットによる制限を受けない容量ユニット (CU) に基づいてクラスターが構築されます。

また、Standard レベルでも、イングレス イベントとスループット ユニットに基づいて課金されます。

Event Hubs の価格については、「Event Hubs の価格」を参照してください。

Azure Databricks

Azure Databricks では StandardPremium の 2 つのサービス レベルを提供しており、それぞれが 3 つのワークロードをサポートしています。 この参照アーキテクチャでは、Premium レベルでの Azure Databricks ワークスペースをデプロイします。

Data Engineering および Data Engineering Light ワークロードは、データ技術者がジョブを構築して実行するためにあります。 Data Analytics ワークロードは、データ科学者がデータと分析情報を対話的に探索、視覚化、操作、および共有するためにあります。

Azure Databricks では、多くの価格モデルを提供しています。

  • 従量課金制プラン

    クラスターにプロビジョニングされた仮想マシン (VM) と、選択した VM インスタンスに基づく Databricks ユニット (DBU) に対して課金されます。 DBU は処理能力の単位であり、1 秒あたりの使用量に対して課金されます。 DBU の消費量は、Azure Databricks を実行しているインスタンスのサイズと種類によって異なります。 価格は、選択したワークロードとレベルに応じて異なります。

  • 事前購入プラン

    1 年または 3 年間のどちらかの期間、Databricks Commit ユニット (DBCU) としての Azure Databricks ユニット (DBU) を確約します。 従量課金制モデルと比較した場合、最大 37% を節約できます。

詳細については、「Azure Databricks の価格」を参照してください。

Azure Cosmos DB

このアーキテクチャでは、一連のレコードが Azure Databricks ジョブによって Azure Cosmos DB に書き込まれます。 予約している容量に対して課金されます。これは、挿入操作の実行に使用される 1 秒あたりの要求ユニット (RU/秒) で表されます。 課金単位は、1 時間あたり 100 RU/秒です。 たとえば、100 KB の項目を書き込むコストは 50 RU/秒です。

書き込み操作の場合、1 秒あたりに必要な書き込みの数をサポートするために十分な容量をプロビジョニングします。 書き込み操作の実行前にポータルまたは Azure CLI を使用してプロビジョニング済みスループットを増大させ、それらの操作の完了後にスループットを減少させることができます。 書き込み期間中のスループットは、指定されたデータに必要な最小スループットと、他のワークロードが実行されていないと仮定した場合に挿入操作に必要なスループットです。

コスト分析の例

1 つのコンテナー上にスループット値 1000 RU/秒を構成するとします。 24 時間かつ 30 日間で、合計で 720 時間としてデプロイされています。

コンテナーは、1 時間ごとに 100 RU/秒の 10 ユニットで課金されます。 $0.008 (1 時間あたり 100 RU/秒) の 10 単位で、1 時間あたり $0.08 が請求されます。

720 時間、つまり (100 RU/秒の) 7,200 ユニットでは、月額 $57.60 が課金されます。

また、ストレージも、格納データとインデックスに使用される GB ごとに課金されます。 詳細については、Azure Cosmos DB の価格モデルに関するページを参照してください。

ワークロード コストをすばやく見積もるには、Azure Cosmos DB 容量計算ツールを使用します。

詳細については、「Microsoft Azure Well-Architected Framework」のコストのセクションを参照してください。

このシナリオのデプロイ

リファレンス実装をデプロイおよび実行するには、GitHub readme の手順に従ってください。

次のステップ