Azure Data Factory でカスタム イベント トリガーを作成してパイプラインを実行する

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

イベントドリブン アーキテクチャ (EDA) は、イベントの運用、検出、使用、および応答を含む一般的なデータ統合パターンです。 多くの場合、データ統合シナリオでは、特定のイベントが発生したときに Azure Data Factory ユーザーがパイプラインをトリガーする必要があります。 Data Factory と Azure Event Grid がネイティブに統合され、カスタム トピックがカバーされました。 イベント グリッド トピックにイベントを送信します。 Data Factory では、トピックをサブスクライブしてリッスンし、それに応じてパイプラインをトリガーします。

注意

この記事で説明されている統合は、Azure Event Grid に依存しています。 サブスクリプションが Event Grid リソース プロバイダーに登録されていることを確認してください。 詳細については、「リソース プロバイダーと種類」を参照してください。 Microsoft.EventGrid/eventSubscriptions/ アクションを実行できる必要があります。 このアクションは、EventGrid EventSubscription 共同作成者の組み込みロールの一部です。

重要

Azure Synapse Analytics でこの機能を使用している場合は、サブスクリプションも Data Factory リソース プロバイダーに登録されている必要があります。そうでなければ、""イベント サブスクリプション" の作成に失敗した" というエラーが表示されます。

パイプライン パラメーターとカスタム イベント トリガーを組み合わせると、パイプラインの実行でカスタム data ペイロードを解析して参照できます。 カスタム イベント ペイロードの data フィールドは自由形式の JSON キー値構造なので、イベント駆動型パイプラインの実行を制御できます。

重要

パラメーター化で参照されるキーがカスタム イベント ペイロードに存在しない場合、trigger run は失敗します。 プロパティ keyName が存在しないため式を評価できないというエラーが表示されます。 この場合、イベントによってトリガーされる pipeline runありません

Event Grid でカスタム トピックを設定する

Data Factory でカスタム イベント トリガーを使用するには、"まず" Event Grid でカスタム トピックを設定する必要があります。

Azure Event Grid に移動し、自分でトピックを作成します。 カスタム トピックを作成する方法の詳細については、Azure Event Grid のポータルのチュートリアルCLI のチュートリアルに関する記事を参照してください。

注意

このワークフローは、ストレージ イベント トリガーとは異なります。 ここでは、トピックは Data Factory によって設定されません。

Data Factory は、イベントが Event Grid イベント スキーマに従うことを想定しています。 イベント ペイロードに次のフィールドがあることを確認してください。

[
  {
    "topic": string,
    "subject": string,
    "id": string,
    "eventType": string,
    "eventTime": string,
    "data":{
      object-unique-to-each-publisher
    },
    "dataVersion": string,
    "metadataVersion": string
  }
]

Data Factory を使用してカスタム イベント トリガーを作成する

  1. Azure Data Factory に移動してサインインします。

  2. [編集] タブに切り替えます。鉛筆アイコンを探します。

  3. メニューの [トリガー] を選択し、 [新規作成/編集] を選択します。

  4. [Add Triggers](トリガーの追加) ページで、 [Choose trigger](トリガーの選択) を選択し、 [+ 新規] を選択します。

  5. [種類][カスタム イベント] を選択します。

    Screenshot of Author page to create a new custom event trigger in Data Factory UI.

  6. [Azure サブスクリプション] ドロップダウンからカスタム トピックを選択するか、イベント トピックのスコープを手動で入力します。

    注意

    Data Factory でカスタム イベント トリガーを作成または変更するには、適切なロールベースのアクセス制御 (Azure RBAC) を持つ Azure アカウントを使用する必要があります。 追加のアクセス許可は不要です。 Data Factory のサービス プリンシパルでは、Event Grid に対する特別なアクセス許可は必要ありません。 アクセス制御の詳細については、「ロールベースのアクセス制御」セクションを参照してください。

  7. [次で始まるサブジェクト] および [次で終わるサブジェクト] プロパティを使用して、トリガー イベントをフィルター処理できます。 これらのプロパティは両方とも省略可能です。

  8. [+ 新規] を使用して、フィルター処理する [イベントの種類] を追加します。 カスタム イベント トリガーの一覧では、OR リレーションシップが使用されます。 eventType プロパティを持つカスタム イベントが一覧上の 1 つと一致すると、パイプラインの実行がトリガーされます。 イベントの種類では大文字と小文字が区別されます。 たとえば、次のスクリーンショットでは、トリガーは、サブジェクトが factories で始まるすべての copycompleted または copysucceeded イベントと一致します。

    Screenshot of Edit Trigger page to explain Event Types and Subject filtering in Data Factory UI.

  9. カスタム イベント トリガーを使用して、カスタム data ペイロードを解析し、パイプラインに送信できます。 パイプライン パラメーターを作成し、 [パラメーター] ページで値を入力します。 形式 @triggerBody().event.data._keyName_ を使用してデータ ペイロードを解析し、パイプラインのパラメーターに値を渡します。

    詳細については、次の記事を参照してください。

    Screenshot of pipeline parameters settings.

    Screenshot of the parameters page to reference data payload in custom event.

  10. パラメーターを入力したら、 [OK] を選択します。

高度なフィルター処理

カスタム イベント トリガーでは、Event Grid の高度なフィルター処理と同様の高度なフィルター処理機能がサポートされています。 これらの条件付きフィルターを使用すると、イベント ペイロードのに基づいてパイプラインをトリガーできます。 たとえば、イベント ペイロードに Department という名前のフィールドがあり、DepartmentFinance である場合にのみパイプラインをトリガーする必要があるとします。 date フィールドにはリスト [1, 2, 3, 4, 5] を、month フィールドの not 条件にはリスト [11, 12] を、tag フィールドは ['Fiscal Year 2021', 'FiscalYear2021', 'FY2021'] のいずれかを含む、というように複雑なロジックを指定することもできます。

Screenshot of setting advanced filters for customer event trigger

カスタム イベント トリガーでは現在、高度なフィルター処理演算子一部が Event Grid でサポートされています。 次のフィルター条件がサポートされています。

  • NumberIn
  • NumberNotIn
  • NumberLessThan
  • NumberGreaterThan
  • NumberLessThanOrEquals
  • NumberGreaterThanOrEquals
  • BoolEquals です。
  • StringContains
  • StringBeginsWith
  • StringEndsWith
  • StringIn
  • StringNotIn

[+ 新規] を選択して、新しいフィルター条件を追加します。

さらに、カスタム イベント トリガーは、次を含むイベント グリッドと同じ制限に従います。

  • カスタム イベント トリガーごとに、すべてのフィルターで 5 つの高度なフィルターと 25 のフィルター値
  • 文字列値あたり 512 文字
  • in と not in 演算子の 5 つの値
  • john.doe@contoso.com のように、キーに . (ドット) 文字を含めることはできません。 現時点では、エスケープ文字を含むキーはサポートされていません。
  • 複数のフィルターで同じキーを使用できます。

Data Factory は、Event Grid API の最新の GA バージョンに依存しています。 新しい API バージョンが GA 段階になると、Data Factory では、より高度なフィルター処理演算子のサポートが拡張されます。

JSON スキーマ

次の表に、カスタム イベント トリガーに関連するスキーマ要素の概要を示します。

JSON 要素 説明 Type 使用できる値 必須
scope イベント グリッド トピックの Azure Resource Manager リソース ID。 String Azure Resource Manager ID はい
events このトリガーを起動するイベントの種類。 文字列の配列 はい。少なくとも 1 つの値が必要です。
subjectBeginsWith subject フィールドは、トリガーを起動するために指定されたパターンで始まる必要があります。 たとえば、factories は、factories で始まるイベント サブジェクトに対してのみトリガーを発動させます。 String いいえ
subjectEndsWith subject フィールドは、トリガーを起動するために指定されたパターンで終わる必要があります。 String いいえ
advancedFilters それぞれフィルター条件を指定する JSON BLOB のリスト。 各 BLOB は keyoperatorTypevalues を指定します。 JSON BLOB のリスト いいえ

ロールベースのアクセス制御

Azure Data Factory では、Azure ロールベースのアクセス制御 (RBAC) を使用して未認可のアクセスを禁止します。 正常に機能するには、Data Factory で、以下を行うためのアクセス権が必要です。

  • イベントをリッスンする。
  • イベントから更新をサブスクライブする。
  • カスタム イベントにリンクされているパイプラインをトリガーする。

カスタム イベント トリガーを正常に作成または更新するには、適切なアクセス権を持つ Azure アカウントを使用して Data Factory にサインインする必要があります。 それ以外の場合は、"アクセス拒否" エラーが発生して操作が失敗します。

Data Factory には、Event Grid に対する特別なアクセス許可は必要ありません。 この操作のために Data Factory サービス プリンシパルに特別な Azure RBAC ロールのアクセス許可を割り当てる必要も "ありません"。

具体的には、/subscriptions/####/resourceGroups//####/providers/Microsoft.EventGrid/topics/someTopics に対する Microsoft.EventGrid/EventSubscriptions/Write 権限が必要です。

  • データ ファクトリ (開発環境内などの) で作成する場合は、サインインしている Azure アカウントに上記のアクセス許可が必要です。
  • CI/CD を使用して発行する場合、ARM テンプレートをテスト ファクトリと運用ファクトリに発行するために使用するアカウントには上記のアクセス許可が必要です。