Azure Data Factory および Azure Synapse Analytics 内の Data Flow アクティビティ
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法についてはこちらでご確認ください。
データ フロー アクティビティを使用して、Mapping Data Flow を介してデータを変換および移動します。 データ フローを初めて扱う場合は、Mapping Data Flow の概要に関するページを参照してください。
UI を使用して Data Flow アクティビティを作成する
パイプライン内で Data Flow アクティビティを使用するには、次の手順を実行します。
パイプラインの [アクティビティ] ペイン内で Data Flow を検索し、Data Flow アクティビティをパイプライン キャンバスにドラッグします。
キャンバス上で新しいデータ フロー アクティビティ (まだ選択されていない場合)、[設定] タブの順に選択して、詳細を編集します。
チェックポイント キーは、データ フローが変更されたデータ キャプチャに使用されるときにチェックポイントを設定するために使用されます。 上書きすることができます。 データ フロー アクティビティでは、"パイプライン名 + アクティビティ名" ではなくチェックポイント キーとして guid 値を使用するため、名前変更アクションがある場合でも、顧客の変更データ キャプチャの状態を常に追跡できます。 既存のすべてのデータ フロー アクティビティは、下位互換性のために古いパターン キーを使用します。 変更データ キャプチャが有効なデータ フロー リソースを使用して、新しいデータ フロー アクティビティをパブリッシュした後のチェックポイント キーのオプションを次に示します。
既存のデータ フローを選択するか、[新規作成] ボタンを使用して新しく作成します。 必要に応じてその他のオプションを選択し、構成を完了します。
構文
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
型のプロパティ
プロパティ | 説明 | 使用できる値 | 必須 |
---|---|---|---|
dataflow | 実行されているデータ フローへの参照 | DataFlowReference | はい |
integrationRuntime | データ フローが実行されているコンピューティング環境です。 指定されていない場合は、自動解決 Azure 統合ランタイムが使用されます。 | IntegrationRuntimeReference | いいえ |
compute.coreCount | Spark クラスター内で使用されるコアの数です。 自動解決 Azure 統合ランタイムが使用されている場合にのみ指定できます | 8、16、32、48、80、144、272 | いいえ |
compute.computeType | Spark クラスター内で使用されるコンピューティングの種類です。 自動解決 Azure 統合ランタイムが使用されている場合にのみ指定できます | "一般" | いいえ |
staging.linkedService | Azure Synapse Analytics ソースまたはシンクを使用している場合は、PolyBase ステージングに使用するストレージ アカウントを指定します。 Azure Storage が VNet サービス エンドポイントを使用して構成されている場合は、ストレージ アカウントで [信頼された Microsoft サービスを許可する] を有効にしたマネージド ID 認証を使用する必要があります。「Azure Storage で VNet サービス エンドポイントを使用した場合の影響」を参照してください。 また、Azure Blob と Azure Data Lake Storage Gen2 に必要な構成についても説明します。 |
LinkedServiceReference | データ フローが Azure Synapse Analytics に対して読み取りまたは書き込みを行う場合のみ |
staging.folderPath | Azure Synapse Analytics ソースまたはシンクを使用している場合は、PolyBase ステージングに使用する BLOB ストレージ アカウント内のフォルダー パス | String | データ フローが Azure Synapse Analytics に対して読み取りまたは書き込みを行う場合のみ |
traceLevel | データ フロー アクティビティの実行のログ レベルを設定します | Fine、Coarse、None | No |
実行時、データ フロー コンピューティングのサイズを動的に設定する
Core Count プロパティと Compute Type プロパティは、実行時に入ってくるソース データのサイズに合わせて調整されるよう、動的に設定できます。 ソース データセット データのサイズを見つける目的で、Lookup や Get Metadata など、パイプライン アクティビティを使用します。 次に、Data Flow アクティビティ プロパティで Add Dynamic Content を使用します。 小、中、または大規模のコンピューティング サイズを選択できます。 必要に応じて、[カスタム] を選択し、コンピューティングの種類とコア数を手動で構成します。
こちらの短い動画チュートリアルでこの手法について説明しています
データ フロー統合ランタイム
データ フロー アクティビティの実行に使用する統合ランタイムを選択します。 既定では、このサービスは 4 つのワーカー コアを持つ自動解決 Azure 統合ランタイムを使用します。 この IR は汎用目的のコンピューティングの種類で、ご使用のサービス インスタンスと同じリージョンで実行します。 運用可能なパイプラインとするには、データ フロー アクティビティの実行用に特定のリージョン、コンピューティングの種類、コア数、および TTL を定義する独自の Azure 統合ランタイムを作成することを強くお勧めします。
General Purpose の最小コンピューティング タイプが 8 + 8 (合計 16 個の v コア) の構成で、ほとんどの運用ワークロードの最小推奨は 10 分の Time to live (TTL) です。 小さい TTL を設定することにより、Azure IR は、コールド クラスターのような数分の開始時間を要さないウォーム クラスターを維持できます。 詳細については、Azure 統合ランタイムに関するページを参照してください。
重要
データ フロー アクティビティでの Integration Runtime の選択は、お使いのパイプラインのトリガー済みの実行のみに適用されます。 データ フローを使用したパイプラインのデバッグは、デバッグ セッションで指定されたクラスターで実行されます。
PolyBase
Azure Synapse Analytics をシンクまたはソースとして使用する場合は、PolyBase バッチ読み込み用のステージングの場所を選択する必要があります。 PolyBase を使用すると、データを行ごとに読み込む代わりに一括してバッチ読み込みを行うことができます。 PolyBase を実行すると、Azure Synapse Analytics への読み込み時間が大幅に短縮されます。
チェックポイント キー
データ フロー ソースの変更キャプチャ オプションを使用すると、ADF はユーザーの代わりに自動的にチェックポイントの保持と管理を行います。 既定のチェックポイント キーは、データ フロー名とパイプライン名のハッシュです。 ソース テーブルまたはフォルダーに動的パターンを使用する場合は、このハッシュを上書きして、独自のチェックポイント キーの値をここに設定することをお勧めします。
ログ記録レベル
データ フロー アクティビティのすべてのパイプライン実行がすべての詳細なテレメトリ ログを完全にログ記録する必要がない場合は、必要に応じてログ レベルを "Basic" または "None" に設定できます。 データ フローを "Verbose" モード (既定値) で実行している場合、データ変換中に個別のパーティション レベルのそれぞれでアクティビティを完全にログ記録するように、サービスに要求していることになります。 これは負荷の高い操作であるため、トラブルシューティングを行うときにのみ詳細を有効にすることで、データ フローとパイプラインのパフォーマンス全体を向上させることができます。 "Basic" モードは変換の実行時間だけをログ記録し、"None" は実行時間の要約だけを記録します。
シンクのプロパティ
データ フローのグループ化機能を使用すると、シンクの実行順序を設定できるだけでなく、同じグループ番号を使用してシンクをグループ化できます。 グループを管理しやすくするため、シンクを同じグループ内で並列で実行するように、サービスに要求できます。 また、いずれかのシンクでエラーが発生しても続行するようにシンク グループを設定することもできます。
データ フロー シンクの既定の動作では、各シンクが逐次実行され、シンクでエラーが発生した場合はデータ フローが失敗します。 さらに、データ フロー プロパティでシンクに異なる優先順位を設定しない限り、すべてのシンクは既定で同じグループに設定されます。
First row only (先頭行のみ)
このオプションは、"アクティビティへの出力" でキャッシュ シンクが有効になっているデータ フローでのみ使用できます。 パイプラインに直接挿入されるデータ フローからの出力は、2 MB に制限されます。 "最初の行のみ" を設定すると、データ フロー アクティビティの出力をパイプラインに直接挿入する際に、データ フローからのデータ出力を制限することができます。
データ フローをパラメーター化する
パラメーター化されたデータセット
データ フローでパラメーター化されたデータセットを使用する場合は、 [設定] タブでパラメーター値を設定します。
パラメーター化されたデータ フロー
データ フローがパラメーター化されている場合は、 [パラメーター] タブでデータ フロー パラメーターの動的な値を設定します。パイプライン式言語またはデータ フロー式言語のいずれかを使用して、動的パラメーター値またはリテラル パラメーター値を割り当てることができます。 詳しくは、データ フロー パラメーターに関するページを参照してください。
パラメーター化されたコンピューティングのプロパティ
自動解決 Azure 統合ランタイムを使用し、かつ compute.coreCount と compute.computeType の値を指定すると、コア カウントやコンピューティングの種類をパラメーター化することができます。
データ フロー アクティビティのパイプライン デバッグ
データ フロー アクティビティを使用してデバッグ パイプラインを実行するには、上部バーにある [Data Flow Debug](データ フロー デバッグ) スライダーを使用して、データ フロー デバッグ モードをオンに切り替える必要があります。 デバッグ モードでは、アクティブな Spark クラスターに対してデータ フローを実行できます。 詳細については、デバッグ モードに関するページを参照してください。
デバッグ パイプラインは、データ フロー アクティビティ設定で指定された統合ランタイム環境ではなく、アクティブなデバッグ クラスターに対して実行されます。 デバッグ モードを開始するときに、デバッグ コンピューティング環境を選択できます。
データ フロー アクティビティを監視する
データ フロー アクティビティには、パーティション分割、ステージ時間、およびデータ系列の情報を表示できる特別な監視エクスペリエンスがあります。 [アクション] の下にある眼鏡アイコンを使用して、[監視] ウィンドウを開きます。 詳しくは、データ フローの監視に関するページを参照してください。
後続のアクティビティでデータ フロー アクティビティの結果を使用する
データ フロー アクティビティは、各シンクに書き込まれた行の数と各ソースから読み取られた行に関するメトリックを出力します。 これらの結果は、アクティビティの実行結果の output
セクションに返されます。 返されるメトリックは、以下の JSON の形式です。
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
たとえば、'dataflowActivity' という名前のアクティビティで、'sink1' という名前のシンクに書き込まれた行の数を取得するには、@activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten
を使用します。
このシンクで使用されていた、'source1' という名前のソースから読み取られた行の数を取得するには、@activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead
を使用します。
Note
シンクに書き込まれた行が 0 の場合は、メトリックに表示されません。 存在を確認するには、contains
関数を使用します。 たとえば、contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1')
は、sink1 に何らかの行が書き込まれたかどうかを確認します。
関連するコンテンツ
サポートされている制御フロー アクティビティを参照してください。