次の方法で共有


Real-Time Intelligence チュートリアル パート 2: デジタル ツイン ビルダー (プレビュー) におけるストリーミングデータの取得と処理

チュートリアルのこの部分では、別の種類のサンプル データ (バスの場所に関する時系列情報を含むサンプル バス データのリアルタイム データ ストリーム) を設定します。 サンプル データをイベントハウスにストリーミングし、データに対していくつかの変換を実行してから、前のセクションで作成したサンプル データ lakehouse にイベントハウス データを取得するショートカットを作成します。 デジタル ツイン ビルダーでは、データをレイクハウスに格納する必要があります。

イベントハウスを作成する

  1. チュートリアルリソースを作成するワークスペースを参照します。 すべてのリソースを同じワークスペースに作成する必要があります。
  2. + 新規アイテム を選択します。
  3. [項目の種類でフィルター処理する] 検索ボックスに「Eventhouse」と入力します。
  4. Eventhouse 項目を選択します。
  5. イベントハウス名として「Tutorial」と入力します。 KQL データベースは、同じ名前で同時に作成されます。
  6. を選択してを作成します。 プロビジョニングが完了すると、イベントハウス の [システムの概要] ページが表示されます。

eventstream を作成する

このセクションでは、サンプル のバス ストリーミング データを eventhouse に送信するイベントストリームを作成します。

ソースの追加

イベントストリームを作成し、 Buses サンプル データをソースとして追加するには、次の手順に従います。

  1. イベントハウスの [KQL データベース ] ウィンドウで、新しい Tutorial データベースを選択します。

  2. メニュー リボンから [データの 取得 ] を選択し、[ Eventstream] > [新しいイベントストリーム] を選択します。

    Tutorial データベースの新しいイベントストリームを取得するスクリーンショット。

  3. eventstream BusEventstream に名前を付けます。 イベントストリームの作成が完了すると、イベントストリームが開きます。

  4. [サンプル データの使用] を選択します。

    eventstream のサンプル データを選択するスクリーンショット。

  5. [ ソースの追加] ページで、ソース名として 「BusDataSource 」と入力します。 サンプル データバスを選択します。 [] を選択し、[] を追加します。

    バス サンプル データの選択のスクリーンショット。

    新しいイベントストリームの準備ができたら、作成キャンバスで開きます。

データを変換する

このセクションでは、受信サンプル データに 1 つの変換を追加します。 この手順では、文字列フィールドの ScheduleTimeTimestamp を DateTime 型にキャストし、わかりやすくするために Timestamp の名前を ArrivalTime に変更します。 タイムスタンプ フィールドを時系列データとして使用するには、デジタル ツイン ビルダー (プレビュー) のタイムスタンプ フィールドを DateTime 形式にする必要があります。

データ変換を追加するには、次の手順に従います。

  1. [ イベントの変換] タイルまたは [変換先の追加 ] タイルの下矢印を選択し、[ フィールドの管理 ] の定義済み操作を選択します。 タイルの名前が ManageFields に変更されます。

    [フィールドの管理] 操作の選択のスクリーンショット。

  2. MangeFields タイルの編集アイコン (鉛筆のような形) を選択すると、[フィールドの管理] ウィンドウが開きます。

  3. [ すべてのフィールドの追加] を選択します。 このアクションにより、変換によってソース データのすべてのフィールドが確実に存在します。

  4. [タイムスタンプ] フィールドを選択します。 [種類の変更][はい] に切り替えます。 [ 変換された型] で、ドロップダウン リストから DateTime を選択します。 [名前]、ActualTime の新しい名前を入力します。

    [タイムスタンプ] フィールドの変更のスクリーンショット。

  5. 保存せずに、[ ScheduleTime ] フィールドを選択します。 [種類の変更][はい] に切り替えます。 [ 変換された型] で、ドロップダウン リストから DateTime を選択します。 名前は ScheduleTime のままにします

    [ 保存] を選択します。

  6. [ フィールドの管理 ] ウィンドウが閉じます。 ManageFields タイルは、宛先に接続するまでエラーを表示し続けます。

宛先を追加する

  1. メニュー リボンから [宛先の 追加] を選択し、[ Eventhouse] を選択します。

    イベントハウスの宛先を選択するスクリーンショット。

  2. [Eventhouse]\(イベントハウス\) ウィンドウに次の情報を入力します。

    フィールド 価値
    データ インジェスト モード インジェスト前イベント処理
    宛先名 TutorialDestination
    ワークスペース リソースを作成したワークスペースを選択します。
    Eventhouse チュートリアル
    KQL データベース チュートリアル
    KQL 宛先テーブル 新規作成 - テーブル名として bus_data_raw を入力します
    入力データ形式 Json
  3. [データ ソースの追加後にインジェストをアクティブ化する] ボックスがオンになっていることを確認します。

  4. 保存 を選択します。

  5. 作成キャンバスで、[ ManageFields ] タイルを選択し、矢印を TutorialDestination タイルにドラッグして接続します。 このアクションにより、フロー内のすべてのエラー メッセージが解決されます。

  6. メニュー リボンから [発行] を選択 します。 これで、eventstream はサンプル ストリーミング データの eventhouse への送信を開始します。

  7. 数分後、Eventstream ビューの TutorialDestination カードには、[ データ プレビュー ] タブにサンプル データが表示されます。データが届くのを待っている間に、プレビューを数回更新することが必要になる場合があります。

    プレビュー データのスクリーンショット。

  8. イベントハウスでデータ テーブルがアクティブであることを確認します。 チュートリアル KQL データベースに移動し、ビューを更新します。 データを含む bus_data_raw というテーブルが含まれるようになりました。

    データを含む bus_data_raw テーブルのスクリーンショット。

更新ポリシーを使用してデータを変換する

バス ストリーミング データが KQL データベースに格納されたので、関数と Kusto 更新ポリシー を使用してデータをさらに変換できます。 このセクションで実行する変換では、デジタル ツイン ビルダー (プレビュー) で使用するデータを準備し、次のアクションを含めます。

  • JSON フィールド Properties を、含まれているデータ項目、 BusStatusTimeToNextStationごとに個別の列に分割します。 デジタル ツイン ビルダーには JSON 解析機能がないため、データがデジタル ツイン ビルダーに送信される前に、これらの値を分離する必要があります。
  • StopCodeを追加します。これは、各バス停を表す一意のキーです。 この手順の目的は、このチュートリアル シナリオをサポートするためにサンプル データ セットを完了することです。 個別のデータ ソースからの結合可能なエンティティ インスタンスには、デジタル ツイン ビルダーがそれらをリンクするために使用できる共通の列が含まれている必要があります。そのため、この手順では、静的バスストップ データ セットの Stop_Code フィールドに一致するシミュレートされた int 値のセットを追加します。 現実の世界では、関連するデータ セットには既に何らかの共通点が含まれています。
  • 変換されたバス データを含む bus_data_processed という名前の新しいテーブルを作成します。
  • ショートカットを使用して Tutorial Lakehouse のデータにアクセスできるように、新しいテーブルに対して OneLake の可用性を有効にします。

変換クエリを実行するには、次の手順に従います。

  1. イベントハウス内の Tutorial KQL データベースを選択します。 メニュー リボンから[ コードを含むクエリ]を選択すると、KQL クエリ エディターが開きます。

    データベースの [コードを含むクエリ] ボタンのスクリーンショット。

  2. 次のコードをコピーしてクエリ エディターに貼り付けます。 各コード ブロックを順番に実行します。

    // Set columns
    .create-or-alter function extractBusData ()
    {
        bus_data_raw
        | extend BusState = tostring(todynamic(Properties).BusState)
            , TimeToNextStation = tostring(todynamic(Properties).TimeToNextStation)
            , StopCode = toint(10000 + abs(((toint(BusLine) * 100) + toint(StationNumber)) % 750))
        | project-away Properties
    }
    
    // Create table
    .create table bus_data_processed (ActualTime:datetime, TripId:string, BusLine:string, StationNumber:string, ScheduleTime:datetime, BusState:string, TimeToNextStation:string, StopCode:int)
    
    //Load data into table
    .alter table bus_data_processed policy update
    ```
    [{
        "IsEnabled": true,
        "Source": "bus_data_raw",
        "Query": "extractBusData",
        "IsTransactional": false,
        "PropagateIngestionProperties": true
    }]
    ```
    
    // Enable OneLake availability
    .alter-merge table bus_data_processed policy mirroring dataformat=parquet with (IsEnabled=true, TargetLatencyInMinutes=5)
    

    ヒント

    コードを使用する代わりに、UI を使用して新しいテーブルの OneLake 可用性を有効にすることもできます。 テーブルを選択し、 OneLake の可用性を切り替えます。

    UI で OneLake の可用性を有効にするスクリーンショット。

    UI オプションでは、既定の待機時間は、データの量に応じて 15 分から数時間です。 待機時間を 5 分に短縮するには、前のコード ブロックに示すように .alter-merge table コマンドを使用します。

  3. 必要に応じて、[クエリ] タブを Bus データ処理 として保存して、後で識別できるようにします。

  4. bus_data_processedという名前の新しいテーブルがデータベースに作成されます。 しばらく待つと、処理されたバス データの設定が開始されます。

    データを含む bus_data_processed テーブルのスクリーンショット。

Lakehouse ショートカットを作成する

最後に、デジタル ツイン ビルダー (プレビュー) のサンプル データを保持する Tutorial lakehouse で処理されたバス データを公開するショートカットを作成します。 デジタル ツイン ビルダーではデータ ソースがレイクハウスである必要があるため、この手順が必要です。

  1. チュートリアルの lakehouse に移動してください (パート1「コンテキストデータをアップロードする」で以前に作成したものです)。 メニュー リボンから、[データの取得] >[新しいショートカット] を選択します。

    [新しいショートカット] ボタンのスクリーンショット。

  2. [内部ソース] で、[Microsoft OneLake] を選びます。 次に、 チュートリアル KQL データベースを選択します。

  3. テーブルの一覧を展開し、bus_data_processedの横にあるチェック ボックスをオンにします。 [次へ] を選択します。

  4. ショートカットの詳細を確認し、[ 作成] を選択します。

    ショートカットの作成のスクリーンショット。

  5. bus_data_processedテーブルがレイクハウスで利用できるようになりました。 データが含まれていることを確認します (これには数分かかる場合があります)。

    レイクハウスのbus_data_processedのスクリーンショット。

次に、この lakehouse データをソースとして使用して、デジタル ツイン ビルダーでオントロジを構築します。

次のステップ