次の方法で共有


Azure Databricks によるストリーム処理

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

この参照アーキテクチャでは、エンド ツー エンドのストリーム処理パイプラインを示します。 このパイプラインの 4 つのステージには、取り込み、処理、格納、および分析とレポートが含まれます。 この参照アーキテクチャでは、パイプラインは、2 つのソースからデータを取り込み、各ストリームの関連するレコードに対して結合を実行し、結果を強化させ、リアルタイムで平均を計算します。 結果は、さらに分析するために格納されます。

アーキテクチャ

Azure Databricks を使用したストリーム処理の参照アーキテクチャを示す図。

このアーキテクチャの Visio ファイル をダウンロードします。

データ フロー

次のデータ フローは、前の図に対応しています。

  1. Ingest

    2 つのリアルタイムの運用データ ストリームによって、 料金 データと 乗車 データがシステムに供給されます。 タクシーにインストールされたデバイスは、データ ソースとして機能し、Azure Event Hubs にイベントを発行します。 各ストリームは、独立したインジェスト パスを提供する独自のイベント ハブ インスタンスに送信されます。

  2. 処理

    Azure Databricks は両方の Event Hubs ストリームを使用し、次の操作を実行します。

    • 運賃レコードと乗車レコードの関連付け
    • Azure Databricks ファイル システムに格納されている近隣参照データを含む 3 番目のデータセットを使用して、データを強化します

    このプロセスにより、ダウンストリームの分析とストレージに適した統合されたエンリッチメントされたデータセットが生成されます。

  3. Store

    Azure Databricks ジョブの出力は、一連のレコードです。 処理されたレコードは、Azure Cosmos DB for NoSQL に書き込まれます。

  4. 分析/レポート

    Fabric は、Azure Cosmos DB for NoSQL の運用データをミラーリングして、 トランザクション のパフォーマンスに影響を与えずに分析クエリを有効にします。 この方法では、分析用の ETL なしパスが提供されます。 このアーキテクチャでは、次の目的でミラーリングを使用できます。

    • Azure Cosmos DB データ (またはデルタ形式のデータ) を Fabric にミラー化する
    • 運用システムとデータセットの同期を維持する
    • 次のツールを使用して分析を有効にします。
      • Lakehouse および Warehouse 向け Fabric SQL 分析エンドポイント
      • Apache Spark ノートブック
      • Kusto クエリ言語 (KQL) を使用した時系列およびログ スタイルの探索によるリアルタイム分析
  5. モニター

    Azure Monitor は、Azure Databricks 処理パイプラインからテレメトリを収集します。 Log Analytics ワークスペースには、アプリケーション ログとメトリックが格納されます。 次のアクションを実行できます。

    • 操作ログのクエリを実行する
    • メトリックを視覚化する
    • エラー、異常、およびパフォーマンスの問題を検査する
    • ダッシュボードを作成する

Components

  • Azure Databricks は、Azure プラットフォーム用に最適化された Spark ベースの分析プラットフォームです。 このアーキテクチャでは、Azure Databricks ジョブによってタクシー乗車と料金のデータが強化され、結果が Azure Cosmos DB に格納されます。

  • Event Hubs は、大量のイベントを取り込むためのスケーリングが可能な、管理された分散インジェスト サービスです。 このアーキテクチャでは、2 つのイベント ハブ インスタンスを使用してタクシーからデータを受信します。

  • Azure Cosmos DB for NoSQL は、管理された複数モデルのデータベース サービスです。 このアーキテクチャでは、Azure Databricks エンリッチメント ジョブの出力が格納されます。 Fabric は Azure Cosmos DB の運用データをミラーリング して、分析クエリを有効にします。

  • Log Analytics は、さまざまなソースからのログ データのクエリと分析に役立つ Azure Monitor 内のツールです。 このアーキテクチャでは、すべてのリソースがこのワークスペースにプラットフォーム ログを格納するように Azure Diagnostics を構成します。 ワークスペースは、Azure Databricks 処理パイプラインから出力される Spark ジョブ メトリックのデータ シンクとしても機能します。

シナリオの詳細

タクシー会社は、各タクシー乗車に関するデータを収集します。 このシナリオでは、2 つの個別のデバイスがデータを送信することを前提としています。 タクシーには、所要時間、距離、乗車場所と降車場所など、各乗車に関する情報を送信するメーターがあります。 別のデバイスでは、乗客からの支払いを受け付け、料金に関するデータを送信します。 タクシー会社は、ライダーの傾向を特定するために、各近隣の 1 マイルあたりの平均チップをリアルタイムで計算したいと考えています。

データ インジェスト

データ ソースをシミュレートするために、この参照アーキテクチャでは ニューヨーク市のタクシー データ データセットを使用します。 このデータセットには、2010 年から 2013 年までのニューヨーク市でのタクシー乗車に関するデータが含まれています。 乗車データと料金データ レコードの両方が含まれています。 乗車データには、乗車時間、乗車距離、乗車場所と降車場所が含まれます。 料金データには、料金、税、チップの金額が含まれます。 両方のレコードの種類のフィールドには、medallion 番号、ハック ライセンス、ベンダー ID が含まれます。 これら 3 つのフィールドの組み合わせは、タクシーとドライバーを一意に識別します。 データは CSV 形式で保存されます。

データ ジェネレーターは、レコードを読み取って Event Hubs に送信する .NET Core アプリケーションです。 ジェネレーターは、JSON 形式の乗車データと CSV 形式の料金データを送信します。

Event Hubs では、 パーティション を使用してデータをセグメント化します。 パーティションを使用すると、コンシューマーは各読み取りデータを並列で読み取ることができます。 Event Hubs にデータを送信するときに、パーティション キーを直接指定できます。 それ以外の場合は、ラウンド ロビン方式でパーティションにレコードが割り当てられます。

このシナリオでは、乗車データと料金データに、特定のタクシーに対して同じパーティション ID を割り当てる必要があります。 この割り当てにより、Databricks は 2 つのストリームを関連付けるときに並列処理の程度を適用できます。 たとえば、乗車データの n パーティション内のレコードは、料金データのパーティション n のレコードと一致します。

Azure Databricks と Event Hubs によるストリーム処理のダイアグラム。

このアーキテクチャの Visio ファイルをダウンロード します。

データ ジェネレーターでは、両方のレコードの種類に対応した共通データ モデルに、 PartitionKeyMedallionHackLicenseを連結した VendorId プロパティがあります。

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

このプロパティは、Event Hubs にデータを送信するときに明示的なパーティション キーを提供します。

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

Event Hubs のスループット容量は、 スループット ユニット で測定されます。 自動拡張を有効にすると、イベント ハブを自動的にスケーリングできます。 この機能は、構成された最大値まで、トラフィックに基づいてスループット ユニットを自動的にスケーリングします。

ストリーム処理

Azure Databricks では、ジョブがデータ処理を実行します。 ジョブはクラスターに割り当てられ、そのジョブで実行されます。 ジョブには、Java で記述されたカスタム コードや、Spark ノートブックを使用できます。

この参照アーキテクチャでは、ジョブは Java と Scala で記述されたクラスを含む Java アーカイブです。 Azure Databricks ジョブの Java アーカイブを指定すると、Azure Databricks クラスターによって操作のクラスが指定されます。 ここで、 main クラスの com.microsoft.pnp.TaxiCabReader メソッドにはデータ処理ロジックが含まれています。

2 つのイベント ハブ インスタンスからストリームを読み取ります

データ処理ロジックでは、2 つの Azure イベント ハブ インスタンスからの読み取りに、 Spark 構造化ストリーミング が使用されます。

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

近隣情報を使用してデータを強化する

乗車データには、乗車場所と降車場所の緯度と経度の座標が含まれます。 これらの座標は便利ですが、分析には簡単には使用できません。 そのため、パイプラインはシェープファイルから読み取った近隣データでこのデータを強化します。

シェープファイル形式はバイナリであり、簡単には解析されません。 ただし、GeoTools ライブラリには、シェープファイル形式を使用する地理空間データ用のツールが用意されています。 このライブラリは、com.microsoft.pnp.GeoFinder クラスで、乗車場所と降車場所の座標に基づいて近隣名を決定するために使用されます。

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

乗車データと料金データを結合する

最初に、乗車データと料金データが変換されます。

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

次に、乗車データが料金データと結合されます。

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

データを処理して Azure Cosmos DB に挿入する

各近隣の平均料金は、特定の時間間隔で計算されます。

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

その後、平均料金が Azure Cosmos DB に挿入されます。

maxAvgFarePerNeighborhood
  .writeStream
  .format("cosmos.oltp")
  .option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
  .option("spark.cosmos.accountKey", "<your-cosmos-key>")
  .option("spark.cosmos.database", "<your-database-name>")
  .option("spark.cosmos.container", "<your-container-name>")
  .option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
  .outputMode("append")
  .start()
  .awaitTermination()

考慮事項

これらの考慮事項は、ワークロードの品質向上に使用できる一連の基本原則である Azure Well-Architected Framework の要素を組み込んでいます。 詳細については、「 Well-Architected Framework」を参照してください。

セキュリティ

セキュリティは、意図的な攻撃や貴重なデータとシステムの誤用に対する保証を提供します。 詳細については、「セキュリティ設計レビューチェックリスト」を参照してください。

Azure Databricks ワークスペースへのアクセスは、管理者コンソールを使用して制御されます。 管理者コンソールには、ユーザーの追加、ユーザーのアクセス許可の管理、シングル サインオンの設定を行う機能が含まれています。 ワークスペース、クラスター、ジョブ、およびテーブルのアクセス制御も、管理者コンソールで設定できます。

シークレットを管理する

Azure Databricks には、資格情報を格納し、ノートブックやジョブで参照するために使用される シークレット ストア が含まれています。 Azure Databricks シークレット ストア内のパーティション シークレットをスコープします。

databricks secrets create-scope --scope "azure-databricks-job"

シークレットは、スコープ レベルで追加されます。

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

ネイティブの Azure Databricks スコープの代わりに、Azure Key Vault ベースのスコープ を使用します。

コードは、Azure Databricks シークレット ユーティリティを使用して シークレットにアクセスします

コストの最適化

コストの最適化では、不要な経費を削減し、運用効率を向上させる方法に重点を置いています。 詳細については、「コストの最適化設計レビューチェックリスト」を参照してください。

コストの見積もりには、Azure 料金計算ツール をご利用ください。 この参照アーキテクチャで使用される次のサービスについて考えてみましょう。

Event Hubs のコストに関する考慮事項

この参照アーキテクチャでは、Standard レベルで Event Hubs をデプロイします。 価格モデルは、スループット ユニット、イングレス イベント、およびキャプチャ イベントに基づいています。 イングレス イベントは、64 KB 以下のデータの単位です。 よりサイズが大きいメッセージには、64 KB の倍数で課金されます。 スループット ユニットは、Azure portal または Event Hubs の管理 API のどちらかを使用して、指定します。

保持日数を増やす必要がある場合は、専用レベルを検討してください。 このレベルでは、厳しい要件を持つシングルテナントデプロイが提供されます。 このオファリングは、容量ユニットに基づいてクラスターを構築し、スループット ユニットに依存しません。 Standard レベルは、イングレス イベントとスループット ユニットに基づいて課金されます。

詳細については、「Event Hubs の価格」を参照してください。

Azure Databricks のコストに関する考慮事項

Azure Databricks には Standard レベルと Premium レベルが用意されており、どちらも 3 つのワークロードをサポートしています。 この参照アーキテクチャでは、Premium レベルに Azure Databricks ワークスペースがデプロイされます。

データ エンジニアリング ワークロードは、ジョブ クラスターで実行する必要があります。 データ エンジニアは、クラスターを使用してジョブを構築および実行します。 データ分析ワークロードは、万能クラスターで実行する必要があり、データ サイエンティストが対話形式でデータと分析情報を探索、視覚化、操作、共有することを目的としています。

Azure Databricks には、複数の価格モデルが用意されています。

  • 従量課金制プランの する

    選択した VM インスタンスに基づいて、クラスターと Azure Databricks ユニット (DBU) でプロビジョニングされた仮想マシン (VM) に対して課金されます。 DBU は、Azure が 1 秒あたりの使用量で課金する処理機能の単位です。 DBU の使用量は、Azure Databricks で実行されるインスタンスのサイズと種類によって異なります。 価格は、選択したワークロードと階層によって異なります。

  • 購入前プランの

    従量課金制モデルと比較して、その期間の総保有コストを削減するために、1 年または 3 年間、Azure Databricks コミット ユニットとして DBU にコミットします。

詳細については、Azure Databricks の価格 に関するページを参照してください。

Azure Cosmos DB のコストに関する考慮事項

このアーキテクチャでは、Azure Databricks ジョブは一連のレコードを Azure Cosmos DB に書き込みます。 予約した容量に対して課金されます。これは 1 秒あたりの要求ユニット数 (RU/秒) で測定されます。 この容量は、挿入操作を実行するために使用されます。 課金の単位は 1 時間あたり 100 RU/秒です。 たとえば、100 KB の項目を書き込むコストは 50 RU/秒です。

書き込み操作の場合は、1 秒あたりに必要な書き込みの数をサポートするのに十分な容量を設定します。 書き込み操作を実行する前にポータルまたは Azure CLI を使用し、それらの操作が完了した後でスループットを減らすことで、プロビジョニングされたスループットを増やすことができます。 書き込み期間のスループットは、特定のデータに必要な最小スループットと挿入操作に必要なスループットの合計です。 この計算では、他のワークロードが実行されていないことを前提としています。

コスト分析の例

コンテナーでスループット値 1,000 RU/秒を構成し、720 時間に相当する 30 日間連続して実行するとします。

コンテナーは、1 時間あたり 100 RU/秒の 10 単位で課金されます。 $0.008 (1 時間あたり 100 RU/秒あたり) の 10 ユニットは、1 時間あたり $0.08 で課金されます。

720 時間または 7,200 ユニット (100 RU) の場合、その月に対して 57.60 ドルが課金されます。

ストレージは、格納されているデータとインデックスに使用される GB ごとにも課金されます。 詳細については、 Azure Cosmos DB の価格モデルに関するページを参照してください。

ワークロード コストの見積もりを簡単に行うには、Azure Cosmos DB 容量計算ツール を使用します。

オペレーショナル エクセレンス

オペレーショナル エクセレンスは、アプリケーションをデプロイし、運用環境で実行し続ける運用プロセスを対象としています。 詳細については、「オペレーショナル エクセレンス設計レビュー チェックリスト」を参照してください。

監視

Azure Databricks は Apache Spark に基づいています。 Azure Databricks と Apache Spark はどちらも、Apache Log4j をログ記録用の標準ライブラリとして使用します。 Apache Spark で提供される既定のログ記録に加えて、Log Analytics にログ記録を実装できます。 詳細については、「Azure Databricks の監視」を参照してください。

com.microsoft.pnp.TaxiCabReader クラスが乗車メッセージと料金メッセージを処理するため、メッセージの形式が正しくないため、無効になる可能性があります。 運用環境では、これらの形式が正しくないメッセージを分析してデータ ソースの問題を特定し、データ損失を防ぐために迅速に修正できるようにすることが重要です。 com.microsoft.pnp.TaxiCabReader クラスは、不正な料金レコードと乗車レコードの数を追跡する Apache Spark アキュムレータを登録します。

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark は Dropwizard ライブラリを使用してメトリックを送信します。 一部のネイティブの Dropwizard メトリック フィールドは Log Analytics と互換性がありません。このため、この参照アーキテクチャにはカスタムの Dropwizard シンクとレポーターが含まれています。 Log Analytics が想定する形式でメトリックを書式設定します。 Apache Spark によってメトリックがレポートされると、形式に誤りがある乗車データと料金データに対するカスタム メトリックも送信されます。

Log Analytics ワークスペースで次のクエリ例を使用して、ストリーミング ジョブの操作を監視できます。 各クエリの引数 ago(1d) は、最後の日に生成されたすべてのレコードを返します。 このパラメーターを調整して、別の期間を表示できます。

ストリーム クエリ操作中にログに記録される例外

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

形式に誤りがある料金データと乗車データの蓄積

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

時間の経過に伴うジョブ操作

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

リソースの編成とデプロイ

  • 運用、開発、およびテスト環境それぞれに対して個別のリソース グループを作成してください。 個別のリソース グループにより、デプロイの管理、テスト デプロイの削除、およびアクセス権の割り当てが行いやすくなります。

  • Azure Resource Manager テンプレート を使用して、コードとしてのインフラストラクチャ プロセスに従って Azure リソースをデプロイします。 テンプレートを使用すると、 Azure DevOps サービス またはその他の継続的インテグレーションおよび継続的デリバリー (CI/CD) ソリューションを使用してデプロイを自動化できます。

  • 各ワークロードを別々のデプロイ テンプレートに配置し、リソースをソース管理システムに格納します。 テンプレートは一緒にデプロイすることも、CI/CD プロセスの一環として個別にデプロイすることもできます。 この方法により、自動化プロセスが簡略化されます。

    このアーキテクチャでは、Event Hubs、Log Analytics、および Azure Cosmos DB が 1 つのワークロードとして識別されます。 これらのリソースは、1 つの Azure Resource Manager テンプレートに含まれています。

  • ワークロードをステージングすることを検討してください。 次のステージに進む前に、さまざまなステージにデプロイし、各ステージで検証チェックを実行します。 そうすることで、運用環境に更新プログラムをプッシュする方法を制御し、予期しないデプロイの問題を最小限に抑えることができます。

    このアーキテクチャには、複数のデプロイ ステージがあります。 Azure DevOps パイプラインを作成し、それらのステージを追加することを検討してください。 次のステージを自動化できます。

    • Azure Databricks クラスターを起動します。
    • Azure Databricks CLI を構成します。
    • Scala ツールをインストールします。
    • Azure Databricks シークレットを追加します。

    Azure Databricks コードとそのライフ サイクルの品質と信頼性を向上させるために、自動化された統合テストを記述することを検討してください。

次のステップ