チュートリアル: Azure Event Grid と Azure Functions を使用して、Event Hubs でキャプチャされたデータを Azure Storage から Azure Synapse Analytics に移行する
このチュートリアルでは、Azure Event Grid と Azure Functions を使用して、Event Hubs でキャプチャされたデータを Azure Blob Storage から Azure Synapse Analytics (具体的には専用 SQL プール) に移行します。
この図は、このチュートリアルでお客様が作成するソリューションのワークフローを示しています。
- Azure イベント ハブに送信されたデータが Azure BLOB ストレージにキャプチャされます。
- データのキャプチャが完了すると、イベントが生成されて Azure Event Grid に送信されます。
- Azure Event Grid は、このイベント データを Azure 関数アプリに転送します。
- 関数アプリによってイベント データの BLOB URL が使用され、ストレージから BLOB が取得されます。
- 関数アプリによって BLOB データが Azure Synapse Analytics に移行されます。
この記事では、次の手順を実行します。
- チュートリアルに必要なインフラストラクチャをデプロイする
- 関数アプリにコードを発行する
- Event Grid のサブスクリプションを作成する
- サンプル データを Event Hubs にストリーム配信する
- キャプチャされたデータを Azure Synapse Analytics で確認する
前提条件
このチュートリアルを完了するには、以下が必要です。
- この記事では、Event Grid と Event Hubs (特にキャプチャ機能) について理解していることを前提としています。 Azure Event Grid に慣れていない場合は、「Azure Event Grid とは」をご覧ください。 Azure Event Hubs のキャプチャ機能について、詳しくは「Azure Event Hubs で Azure Blob Storage または Azure Data Lake Storage にイベントをキャプチャする」をご覧ください。
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
- Visual Studio で次のワークロードを使用: .NET デスクトップ開発、Azure 開発、ASP.NET および Web 開発、Node.js 開発、Python 開発。
- 自分のコンピューターに EventHubsCaptureEventGridDemo サンプル プロジェクトをダウンロードします。
- WindTurbineDataGenerator - 風力タービンのサンプル データをキャプチャが有効なイベント ハブに送信する単純な発行元
- FunctionDWDumper – Azure Storage BLOB に Avro ファイルがキャプチャされたときに、Azure Event Grid からの通知を受信する Azure 関数。 BLOB の URI パスを受信し、その内容を読み取り、そのデータを Azure Synapse Analytics (専用 SQL プール) にプッシュします。
インフラストラクチャをデプロイする
この手順では、Resource Manager テンプレートを使用して必要なインフラストラクチャをデプロイします。 テンプレートをデプロイすると、次のリソースが作成されます。
- Capture 機能が有効なイベント ハブ
- キャプチャされたファイル用のストレージ アカウント
- 関数アプリをホストするための App Service プラン
- イベントを処理するための関数アプリ
- データ ウェアハウスをホストするための SQL Server
- 移行したデータを格納するための Azure Synapse Analytics (専用 SQL プール)
Azure CLI を使用してインフラストラクチャをデプロイする
Azure portal にサインインします。
上部にある [Cloud Shell] ボタンを選択します。
ブラウザーの下部に Cloud Shell が開かれるのがわかります。
- 初めて Cloud Shell を使用する場合:
[Bash] と [PowerShell] のどちらかを選択するオプションが表示されたら、[Bash] を選択します。
[ストレージの作成] を選択してストレージ アカウントを作成します。 Azure Cloud Shell では、一部のファイルを格納するために Azure ストレージ アカウントが必要です。
Cloud Shell が初期化されるまで待ちます。
- 初めて Cloud Shell を使用する場合:
Cloud Shell で、上の図に示すように [Bash] を選択します (まだ選択されていない場合)。
次の CLI コマンドを実行して、Azure リソース グループを作成します。
次のコマンドをコピーして Cloud Shell ウィンドウに貼り付けます。 必要に応じて、リソース グループの名前と場所を変更します。
az group create -l eastus -n rgDataMigration
Enterキーを押します。
次に例を示します。
user@Azure:~$ az group create -l eastus -n rgDataMigration { "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration", "location": "eastus", "managedBy": null, "name": "rgDataMigration", "properties": { "provisioningState": "Succeeded" }, "tags": null }
次の CLI コマンドを実行して、前のセクションで説明したリソース (イベント ハブ、ストレージ アカウント、関数アプリ、Azure Synapse Analytics) をすべてデプロイします。
コマンドをコピーして Cloud Shell ウィンドウに貼り付けます。 または、お好みのエディターにコピーして貼り付け、値を設定してから、コマンドを Cloud Shell にコピーすることもできます。
重要
コマンドを実行する前に、次のエンティティの値を指定します。
- お客様が先ほど作成したリソース グループの名前。
- イベント ハブ名前空間の名前。
- イベント ハブの名前。 値はそのまま (hubdatamigration) でもかまいません。
- SQL サーバーの名前。
- SQL ユーザーの名前とパスワード。
- データベースの名前。
- ストレージ アカウントの名前。
- 関数アプリの名前。
az deployment group create \ --resource-group rgDataMigration \ --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \ --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
Cloud Shell ウィンドウで Enter キーを押して、コマンドを実行します。 多数のリソースを作成するため、このプロセスには時間がかかる場合があります。 コマンドの結果で、エラーがないことを確認します。
ポータルの [Cloud Shell] ボタンまたは Cloud Shell ウィンドウの右上隅にある [X] ボタンを選択して Cloud Shell を閉じます。
リソースが作成されたことを確認する
Azure portal で、左側のメニューにある [リソース グループ] を選択します。
お客様のリソース グループの名前を検索ボックスに入力して、リソース グループの一覧をフィルター処理します。
一覧で、お客様のリソース グループを選択します。
次のリソースがリソース グループに表示されていることを確認します。
Azure Synapse Analytics でテーブルを作成する
このセクションでは、前に作成した専用 SQL プールにテーブルを作成します。
リソース グループ内のリソースの一覧で、専用 SQL プールを選択します。
[専用 SQL プール] ページにある左側のメニューの [主なタスク] セクションで、 [クエリ エディター (プレビュー)] を選択します。
SQL サーバーのユーザーの名前とパスワードを入力し、 [OK] を選択します。 クライアントによる SQL サーバーへのアクセスを許可するメッセージが表示された場合は、[SQL サーバー <your SQL server> の許可リスト IP <your IP Address>] を選択し、[OK] を選択します。
クエリ ウィンドウに、次の SQL スクリプトをコピーして実行します。
CREATE TABLE [dbo].[Fact_WindTurbineMetrics] ( [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, [MeasureTime] datetime NULL, [GeneratedPower] float NULL, [WindSpeed] float NULL, [TurbineSpeed] float NULL ) WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
データが作成されたことをチュートリアルの最後に確認できるように、このタブまたはウィンドウを開いたままにしておきます。
Azure Functions アプリを発行する
最初に、Azure portal から Functions アプリの発行プロファイルを取得します。 次に、発行プロファイルを使用して、Visual Studio から Azure Functions プロジェクトまたはアプリを発行します。
発行プロファイル名の取得
[リソース グループ] ページのリソースの一覧で、Azure Functions アプリを選択します。
アプリの [Function App] (Function アプリ) ページで、コマンド バーの [発行プロファイルの取得] を選択します。
ファイルをダウンロードして、EventHubsCaptureEventGridDemo フォルダーの FunctionEGDDumper サブフォルダーに保存します。
発行プロファイルを使用して Functions アプリを発行する
Visual Studio を起動します。
お客様が前提条件の一環として GitHub からダウンロードした EventHubsCaptureEventGridDemo.sln ソリューションを開きます。 これは
/samples/e2e/EventHubsCaptureEventGridDemo
フォルダーにあります。ソリューション エクスプローラーで、 [FunctionEGDWDumper] プロジェクトを右クリックし、 [発行] を選択します。
次の画面で、[開始] または [発行プロファイルの追加] を選択します。
[発行] ダイアログ ボックスの [対象] で [プロファイルのインポート] を選択し、[次へ] を選択します。
[プロファイルのインポート] タブで、先ほど FunctionEGDWDumper フォルダーに保存した発行設定ファイルを選択し、[完了] を選択します。
Visual Studio でプロファイルを構成している場合は、 [発行] を選択します。 発行が成功したことを確認します。
[Azure 関数] ページが開いている Web ブラウザーで、左側のメニューの [関数] を選択します。 EventGridTriggerMigrateData 関数が一覧に表示されていることを確認します。 表示されない場合は、もう一度 Visual Studio から発行し、ポータルのページを最新の状態に更新してください。
関数を発行すれば、イベントをサブスクライブする準備が整います。
イベントをサブスクライブする
Web ブラウザーの新しいタブまたはウィンドウで Azure portal にサインインします。
Azure portal で、左側のメニューにある [リソース グループ] を選択します。
お客様のリソース グループの名前を検索ボックスに入力して、リソース グループの一覧をフィルター処理します。
一覧で、お客様のリソース グループを選択します。
リソースの一覧から [Event Hubs 名前空間] を選択します。
[Event Hubs 名前空間] ページで、左側のメニューの [イベント] を選択し、ツール バーの [+ イベント サブスクリプション] を選択します。
[イベント サブスクリプションの作成] ページで、次の手順に従います。
イベント サブスクリプションの名前を入力します。
システム トピックの名前を入力します。 システム トピックは、送信者がイベントを送信するためのエンドポイントを提供します。 詳細については、システム トピックに関するページを参照してください
[エンドポイントのタイプ] で、 [Azure 関数] を選択します。
[エンドポイント] で、リンクを選択します。
[Azure 関数の選択] ページで、次のものが自動的に入力されない場合は、次の手順に従います。
- Azure 関数のある Azure サブスクリプションを選択します。
- 関数のリソース グループを選択します。
- 関数アプリを選択します。
- デプロイ スロットを選択します。
- EventGridTriggerMigrateData 関数を選択します。
[Azure 関数の選択] ページで、 [選択の確認] を選択します。
次に、 [イベント サブスクリプションの作成] ページに戻り、 [作成] を選択します。
イベント サブスクリプションが作成されていることを確認します。 Event Hubs 名前空間の [イベント] ページで [イベント サブスクリプション] タブに切り替えます。
リソース グループのリソースの一覧で、(App Service ではなく) App Service プランを選択します。
データを生成するアプリを実行する
イベント ハブ、専用 SQL プール (以前の SQL Data Warehouse)、Azure 関数アプリ、およびイベント サブスクリプションの設定が完了しました。 イベント ハブのデータを生成するアプリケーションを実行する前に、いくつかの値を構成する必要があります。
Azure portal で、先ほど移動したお客様のリソース グループに移動します。
Event Hubs 名前空間を選択します。
[Event Hubs 名前空間] ページで、左側のメニューの [共有アクセス ポリシー] を選択します。
ポリシーの一覧で RootManageSharedAccessKey を選択します。
[接続文字列 - 主キー] テキスト ボックスの隣にあるコピー ボタンを選択します。
お客様の Visual Studio ソリューションに戻ります。
[WindTurbineDataGenerator] プロジェクトを右クリックし、 [スタートアップ プロジェクトとして設定] を選択します。
WindTurbineDataGenerator プロジェクトで、program.cs を開きます。
<EVENT HUBS NAMESPACE CONNECTION STRING>
をポータルからコピーした接続文字列に置き換えます。イベント ハブに対して
hubdatamigration
以外の名前を使用した場合は、<EVENT HUB NAME>
をそのイベント ハブの名前に置き換えます。private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.windows.net/..."; private const string EventHubName = "hubdatamigration";
ソリューションをビルドします。 WindTurbineGenerator.exe アプリケーションを実行します。
数分後に、クエリ ウィンドウが開いている他のブラウザー タブで、データ ウェアハウスのテーブルに対してクエリを実行して、移行されたデータを確認します。
select * from [dbo].[Fact_WindTurbineMetrics]
ソリューションを監視する
このセクションは、ソリューションの監視またはトラブルシューティングに役立ちます。
ストレージ アカウントでキャプチャされたデータを表示する
リソース グループに移動し、イベント データのキャプチャに使用するストレージ アカウントを選択します。
[ストレージ アカウント] ページで、左側のメニューの [Storage Explorer (プレビュー)] を選択します。
[BLOB コンテナー] を展開し、 [windturbinecapture] を選択します。
右側のペインで、対象の Event Hubs 名前空間と同じ名前のフォルダーを開きます。
対象のイベント ハブと同じ名前のフォルダー (hubdatamigration) を開きます。
フォルダーをドリルスルーすると、AVRO ファイルが表示されます。 次に例を示します。
Event Grid トリガーによって関数が呼び出されたことを確認する
リソース グループに移動し、関数アプリを選択します。
左側のメニューの [関数] を選択します。
一覧から EventGridTriggerMigrateData 関数を選択します。
[関数] ページで、左側のメニューの [監視] を選択します。
呼び出しログをキャプチャするアプリケーション分析情報を構成するために、 [構成] を選択します。
新しい Application Insights リソースを作成するか、既存のリソースを選択します。
関数の [監視] ページに戻ります。
イベントを送信しているクライアント アプリケーション (WindTurbineDataGenerator) がまだ実行されていることを確認します。 実行されていない場合は、アプリを実行します。
数分 (5 分以上) 待ってから、 [最新の情報に更新] ボタンを選択して、関数の呼び出しを確認します。
呼び出しを選択すると、詳細が表示されます。
Event Grid により、イベント データがサブスクライバーに配信されます。 次の例は、イベント ハブを介したデータ ストリーミングが BLOB にキャプチャされるときに生成されるデータを示します。 特に、
data
オブジェクトのfileUrl
プロパティがストレージ内の BLOB を指すことに注意してください。 関数アプリによってこの URL が使用され、キャプチャされたデータを含む BLOB ファイルが取得されます。{ "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207", "subject": "hubdatamigration", "eventType": "Microsoft.EventHub.CaptureFileCreated", "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba", "data": { "fileUrl": "https://spehubstorage1207.blob.core.windows.net/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro", "fileType": "AzureBlockBlob", "partitionId": "0", "sizeInBytes": 473444, "eventCount": 2800, "firstSequenceNumber": 55500, "lastSequenceNumber": 58299, "firstEnqueueTime": "2020-12-07T21:49:12.556Z", "lastEnqueueTime": "2020-12-07T21:50:11.534Z" }, "dataVersion": "1", "metadataVersion": "1", "eventTime": "2020-12-07T21:50:12.7065524Z" }
データが専用 SQL プールに格納されていることを確認する
クエリ ウィンドウが開いているブラウザー タブで、専用 SQL プールのテーブルに対してクエリを実行して、移行されたデータを確認します。
次のステップ
- サンプルの設定と実行に関する詳細については、Event Hubs Capture と Event Grid のサンプルに関する記事をご覧ください。
- このチュートリアルでは、
CaptureFileCreated
イベントに対するイベント サブスクリプションを作成しました。 このイベントと Azure Blob Storage がサポートするすべてのイベントの詳細については、「Event Grid ソースとしての Azure Event Hub」を参照してください。 - Event Hubs のキャプチャ機能の詳細については、「Azure Event Hubs で Azure Blob Storage または Azure Data Lake Storage にイベントをキャプチャする」を参照してください。