次の方法で共有


Eventstream REST API

Microsoft Fabric REST API を使用すると、Fabric の手順とプロセスを自動化でき、組織がより効率的かつ正確にタスクを完了するのに役立ちます。 これらのワークフローを自動化することで、エラーを減らし、生産性を向上させ、運用全体でコスト削減を実現できます。

Fabric では、項目は、特定のエクスペリエンス内の一連の機能を表します。 たとえば、Eventstream はリアルタイム インテリジェンス エクスペリエンス内の項目です。 Fabric の各項目は、項目定義 (項目を構成する構造、形式、主要なコンポーネントの概要を示すオブジェクト) によって定義されます。

この記事では、Fabric ワークスペース内で Eventstream 項目を作成および管理するために Microsoft Fabric REST API を使用する方法について包括的なガイドを提供します。 各 Eventstream API 操作の詳細な仕様と、API 呼び出しを設定および構成する手順を確認できます。

Microsoft Fabric REST API の概要すべてについては、「Microsoft Fabric REST APIの使用」を参照してください。

サポートされている Eventstream の API

現在、Eventstream では次の定義に基づく API がサポートされています。

API 説明
定義を使用して Eventstream 項目を作成する ソース、宛先、演算子、ストリームなど、トポロジに関する詳細情報を含む Eventstream 項目をワークスペースに作成するために使用します。
Eventstream 項目定義を取得する ソース、宛先、演算子、ストリームなど、トポロジに関する詳細情報を含む Eventstream 項目定義を取得するために使用します。
Eventstream 項目定義を更新する ソース、宛先、演算子、ストリームを含む Eventstream 項目定義を更新または編集するために使用します。

CRUD 操作を使用して Eventstream 項目を管理するには、「Fabric REST API - Eventstream」にアクセスしてください。 これらの API では、次の操作がサポートされます。

  • Eventstream の作成
  • Eventstream の削除
  • Eventstream の取得
  • Eventstreams の一覧表示
  • Eventstream の更新

Eventstream API を呼び出す方法

手順 1: Fabric に対する認証

Fabric API を使用するには、まず、Fabric サービス用の Microsoft Entra トークンを取得してから、API 呼び出しの Authorization ヘッダーでそのトークンを使用する必要があります。 Microsoft Entra トークンを取得するには、2 つのオプションがあります。

オプション 1: MSAL.NETを使用してトークンを取得する

アプリケーションがサービス プリンシパルを使用して Fabric API にアクセスする必要がある場合、MSAL.NET ライブラリを使用してアクセス トークンを取得できます。 Fabric API クイックスタートに従って、C# コンソール アプリを作成します。これで、MSAL.Net ライブラリを使用して Azure AD (AAD) トークンを取得し、C# HttpClient を使用して List ワークスペース API を呼び出します。

オプション 2: Fabric Portal を使用してトークンを取得する

Azure AD トークンを使用して、Fabric API の認証とテストを行うことができます。 テストするテナントの Fabric Portal にサインインし、F12 キーを押してブラウザーの開発者モードに入ります。 そこのコンソールで、次を実行します。

powerBIAccessToken

トークンをコピーしてアプリケーションに貼り付けます。

手順 2: JSON で Eventstream 本文を準備する

API 要求で base64 に変換される JSON ペイロードを作成します。 Eventstream 項目の定義は、グラフ状の構造であり、次のコンポーネントで構成されます。

フィールド 説明
Sources Eventstream に取り込んで処理できるデータ ソース。 サポートされるデータ ソースには、Azure ストリーミング ソース、サードパーティのストリーミング ソース、データベース CDC (変更データ キャプチャ)、Azure Blob Storage イベント、Fabric システム イベントなどがあります。
Destinations 処理されたデータをルーティングできる Fabric 内のエンドポイント (レクハウス、Eventhouse、Reflex など)。
演算子 リアルタイム データ ストリームを処理するイベント プロセッサ (フィルター、集計、グループ化、結合など)。
ストリーム リアルタイム ハブでサブスクリプションと分析に使用できるデータ ストリーム。 ストリームには、既定のストリームと派生ストリームの 2 種類があります。

GitHub の API テンプレートを使って、Eventstream 本文を定義するのに役立ててください。

Eventstream 項目の定義の詳細については、「Eventstream 項目の定義」セクション 参照してください。

JSON での Eventstream 定義の例:

{
  "sources": [
    {
      "name": "SqlServerOnVmDbCdc",
      "type": "SQLServerOnVMDBCDC",
      "properties":
      {
        "dataConnectionId": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
        "tableName": ""
      }
    }
  ],
  "destinations": [
    {
      "name": "Lakehouse",
      "type": "Lakehouse",
      "properties":
      {
        "workspaceId": "bbbb1111-cc22-3333-44dd-555555eeeeee",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "schema": "",
        "deltaTable": "newTable",
        "minimumRows": 100000,
        "maximumDurationInSeconds": 120,
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "derivedStream"}]
    }
  ],
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "SqlServerOnVmDbCdc"}]
    },
    {
      "name": "derivedStream",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "GroupBy"}]
    }
  ],
  "operators": [
    {
      "name": "GroupBy",
      "type": "GroupBy",
      "inputNodes": [{"name": "myEventstream-stream"}],
      "properties":
      {
        "aggregations": [
          {
            "aggregateFunction": "Average",
            "column":
            {
              "expressionType": "ColumnReference",
              "node": null,
              "columnName": "payload",
              "columnPathSegments": [{"field": "ts_ms"}]
            },
            "alias": "AVG_ts_ms"
          }
        ],
        "groupBy": [],
        "window":
        {
          "type": "Tumbling",
          "properties":
          {
            "duration":
            {
              "value": 5,
              "unit": "Minute"
            },
            "offset":
            {
              "value": 1,
              "unit": "Minute"
            }
          }
        }
      }
    }
  ],
  "compatibilityLevel": "1.0"
}

手順 3: Eventstream JSON の base64 文字列を作成する

Base64 エンコードおよびデコード などのツールを使用して、Eventstream JSON を base64 文字列に変換します。

Eventstream JSON を base64 文字列にエンコードするスクリーンショット。

手順 4: API 要求本文を作成する

前の手順で Base64 でエンコードされた Eventstream JSON を API 要求本文のコンテンツとして使用します。

Base64 でエンコードされた文字列を含むペイロードの例を次に示します。

{
 "definition": {
  "parts": [
   {
    "path": "eventstream.json",
    "payload": "ewogICJzb3VyY2VzIjogWwogICAgewogICAgICAibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMiLAogICAgICAidHlwZSI6ICJTUUxTZXJ2ZXJPblZNREJDREMiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiZGF0YUNvbm5lY3Rpb25JZCI6ICJhYWFhYWFhYS0wMDAwLTExMTEtMjIyMi1iYmJiYmJiYmJiYmIiLAogICAgICAgICJ0YWJsZU5hbWUiOiAiIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVzdGluYXRpb25zIjogWwogICAgewogICAgICAibmFtZSI6ICJMYWtlaG91c2UiLAogICAgICAidHlwZSI6ICJMYWtlaG91c2UiLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAid29ya3NwYWNlSWQiOiAiYmJiYjExMTEtY2MyMi0zMzMzLTQ0ZGQtNTU1NTU1ZWVlZWVlIiwKICAgICAgICAiaXRlbUlkIjogImNjY2MyMjIyLWRkMzMtNDQ0NC01NWVlLTY2NjY2NmZmZmZmZiIsCiAgICAgICAgInNjaGVtYSI6ICIiLAogICAgICAgICJkZWx0YVRhYmxlIjogIm5ld1RhYmxlIiwKICAgICAgICAibWluaW11bVJvd3MiOiAxMDAwMDAsCiAgICAgICAgIm1heGltdW1EdXJhdGlvbkluU2Vjb25kcyI6IDEyMCwKICAgICAgICAiaW5wdXRTZXJpYWxpemF0aW9uIjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJKc29uIiwKICAgICAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAgICAgIHsKICAgICAgICAgICAgImVuY29kaW5nIjogIlVURjgiCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICB9LAogICAgICAiaW5wdXROb2RlcyI6IFt7Im5hbWUiOiAiZGVyaXZlZFN0cmVhbSJ9XQogICAgfQogIF0sCiAgInN0cmVhbXMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIiwKICAgICAgInR5cGUiOiAiRGVmYXVsdFN0cmVhbSIsCiAgICAgICJwcm9wZXJ0aWVzIjoKICAgICAge30sCiAgICAgICJpbnB1dE5vZGVzIjogW3sibmFtZSI6ICJTcWxTZXJ2ZXJPblZtRGJDZGMifV0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogImRlcml2ZWRTdHJlYW0iLAogICAgICAidHlwZSI6ICJEZXJpdmVkU3RyZWFtIiwKICAgICAgInByb3BlcnRpZXMiOgogICAgICB7CiAgICAgICAgImlucHV0U2VyaWFsaXphdGlvbiI6CiAgICAgICAgewogICAgICAgICAgInR5cGUiOiAiSnNvbiIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJlbmNvZGluZyI6ICJVVEY4IgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIkdyb3VwQnkifV0KICAgIH0KICBdLAogICJvcGVyYXRvcnMiOiBbCiAgICB7CiAgICAgICJuYW1lIjogIkdyb3VwQnkiLAogICAgICAidHlwZSI6ICJHcm91cEJ5IiwKICAgICAgImlucHV0Tm9kZXMiOiBbeyJuYW1lIjogIm15RXZlbnRzdHJlYW0tc3RyZWFtIn1dLAogICAgICAicHJvcGVydGllcyI6CiAgICAgIHsKICAgICAgICAiYWdncmVnYXRpb25zIjogWwogICAgICAgICAgewogICAgICAgICAgICAiYWdncmVnYXRlRnVuY3Rpb24iOiAiQXZlcmFnZSIsCiAgICAgICAgICAgICJjb2x1bW4iOgogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImV4cHJlc3Npb25UeXBlIjogIkNvbHVtblJlZmVyZW5jZSIsCiAgICAgICAgICAgICAgIm5vZGUiOiBudWxsLAogICAgICAgICAgICAgICJjb2x1bW5OYW1lIjogInBheWxvYWQiLAogICAgICAgICAgICAgICJjb2x1bW5QYXRoU2VnbWVudHMiOiBbeyJmaWVsZCI6ICJ0c19tcyJ9XQogICAgICAgICAgICB9LAogICAgICAgICAgICAiYWxpYXMiOiAiQVZHX3RzX21zIgogICAgICAgICAgfQogICAgICAgIF0sCiAgICAgICAgImdyb3VwQnkiOiBbXSwKICAgICAgICAid2luZG93IjoKICAgICAgICB7CiAgICAgICAgICAidHlwZSI6ICJUdW1ibGluZyIsCiAgICAgICAgICAicHJvcGVydGllcyI6CiAgICAgICAgICB7CiAgICAgICAgICAgICJkdXJhdGlvbiI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiA1LAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfSwKICAgICAgICAgICAgIm9mZnNldCI6CiAgICAgICAgICAgIHsKICAgICAgICAgICAgICAidmFsdWUiOiAxLAogICAgICAgICAgICAgICJ1bml0IjogIk1pbnV0ZSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIF0sCiAgImNvbXBhdGliaWxpdHlMZXZlbCI6ICIxLjAiCn0",
    "payloadType": "InlineBase64"
   },
   {
    "path": ".platform",
    "payload": "ewogICIkc2NoZW1hIjogImh0dHBzOi8vZGV2ZWxvcGVyLm1pY3Jvc29mdC5jb20vanNvbi1zY2hlbWFzL2ZhYnJpYy9naXRJbnRlZ3JhdGlvbi9wbGF0Zm9ybVByb3BlcnRpZXMvMi4wLjAvc2NoZW1hLmpzb24iLAogICJtZXRhZGF0YSI6IHsKICAgICJ0eXBlIjogIkV2ZW50c3RyZWFtIiwKICAgICJkaXNwbGF5TmFtZSI6ICJhbGV4LWVzMSIKICB9LAogICJjb25maWciOiB7CiAgICAidmVyc2lvbiI6ICIyLjAiLAogICAgImxvZ2ljYWxJZCI6ICIwMDAwMDAwMC0wMDAwLTAwMDAtMDAwMC0wMDAwMDAwMDAwMDAiCiAgfQp9",
    "payloadType": "InlineBase64"
   }
  ]
 }
}

手順 5: API を使用して Eventstream 項目を作成する

アプリケーションで、ペイロードに Base64 でエンコードされた文字列を含む Eventstream 項目を作成する要求を送信します。

PowerShell の例:

$evenstreamAPI = "https://api.fabric.microsoft.com/v1/workspaces/$workspaceId/items" 

## Invoke the API to create the Eventstream
Invoke-RestMethod -Headers $headerParams -Method POST -Uri $evenstreamAPI -Body ($body) -ContentType "application/json"

Eventstream 項目の定義

Eventstream 項目定義には、ソース、宛先、演算子、ストリームの 4 つのコンポーネントで構成されるグラフのような構造があります。

ソース

API 本文に Eventstream のソースを定義するには、各フィールドとプロパティを表に従って正しく指定してください。

フィールド Type 説明 要件 指定できる値/形式
id 文字列 (UUID) ソースの一意識別子 (システムによって生成される)。 CREATE では省略可能、UPDATE では必須 UUID 形式
name String ソースの一意の名前 (Eventstream 内での識別に使用される)。 必須 任意の有効な文字列
type 文字列 (列挙型) ソースの種類を指定します。 定義済みの値のいずれかと一致する必要があります。 必須 AmazonKinesis, AmazonMSKKafka, ApacheKafka, AzureCosmosDBCDC, AzureEventHub, AzureIoTHub, AzureSQLDBCDCAzureSQLMIDBCDC, ConfluentCloud, CustomEndpoint, GooglePubSub, MySQLCDC, PostgreSQLCDC, SampleData, FabricWorkspaceItemEvents, FabricJobEvents, FabricOneLakeEvents
properties Object 選択したソースの種類に固有のその他の設定。 必須 AzureEventHub の種類の例: dataConnectionIdconsumerGroupNameinputSerialization

API 本文の Eventstream ソースの例:

{
  "sources": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "AzureEventHubSource",
      "type": "AzureEventHub",
      "properties":
      {
        "dataConnectionId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "consumerGroupName": "$Default",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      }
    }
  ]
}

Destinations

API 本文に Eventstream の宛先を定義するには、各フィールドとプロパティを表に従って正しく指定してください。

フィールド Type 説明 要件 指定できる値/形式
id 文字列 (UUID) 宛先の一意識別子 (システムによって生成される)。 CREATE では省略可能、UPDATE では必須 UUID 形式
name String 宛先の一意の名前 (Eventstream 内での識別に使用される)。 必須 任意の有効な文字列
type 文字列 (列挙型) 宛先の種類を指定します。 定義済みの値のいずれかと一致する必要があります。 必須 "CustomEndpoint""Eventhouse""Lakehouse"
properties Object 選択した宛先の種類に固有のその他の設定。 必須 Eventhouse の種類の例: "dataIngestionMode""workspaceId""itemId""databaseName"
inputNodes Array Eventstream 名や演算子名など、宛先の入力ノードへの参照。 必須 例: eventstream-1

API 本文の Eventstream ソースの例:

{
  "destinations": [
    {
      "id": "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb",
      "name": "EventhouseDestination",
      "type": "Eventhouse",
      "properties":
      {
        "dataIngestionMode": "ProcessedIngestion",
        "workspaceId": "bbbbbbbb-1111-2222-3333-cccccccccccc",
        "itemId": "cccc2222-dd33-4444-55ee-666666ffffff",
        "databaseName": "myeventhouse",
        "tableName": "mytable",
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "eventstream-1"}]
    }
  ]
}

演算子

API 本文に Eventstream の演算子を定義するには、各フィールドとプロパティを表に従って正しく指定してください。

フィールド Type 説明 要件 指定できる値/形式
name String 演算子の一意の名前。 必須 任意の有効な文字列
type 文字列 (列挙型) 演算子の種類を示します。 定義済みの値のいずれかと一致する必要があります。 必須 "Filter""Join""ManageFields""Aggregate""GroupBy""Union""Expand"
properties Object 選択した演算子の種類に固有のその他の設定。 必須 Filter の種類の例: "conditions"
inputNodes Array 演算子の入力ノードへの参照のリスト。 必須 例: eventstream-1
inputSchemas Array 演算子の入力ノードへの参照のリスト。 省略可能 Filter の種類の例: "schema"

API 本文の Eventstream 演算子の例:

{
  "operators": [
    {
      "name": "FilterName",
      "type": "Filter",
      "inputNodes": [{"name": "eventstream-1"}],
      "properties":
      {
        "conditions": [
          {
            "column":
            {
              "node": "nodeName",
              "columnName": "columnName",
              "columnPath": ["path","to","column"]
            },
            "operator": "Equals",
            "value":
            {
              "dataType": "nvarchar(max)",
              "value": "stringValue"
            }
          }
        ]
      }
    }
  ]
}

ストリーム

API 本文にストリームを定義するには、各フィールドとプロパティを表に従って正しく指定してください。

フィールド Type 説明 要件 指定できる値/形式
id 文字列 (UUID) ストリームの一意識別子 (システムによって生成される)。 省略可能 UUID 形式
name String ストリームの一意の名前。 必須 任意の有効な文字列
type 文字列 (列挙型) ストリームの種類を指定します。 定義済みの値のいずれかと一致する必要があります。 必須 "DefaultStream""DerivedStream"
properties Object 選択したストリームの種類に固有のその他の設定。 必須 Filter の種類の例: "conditions"
inputNodes Array ストリームの入力ノードへの参照のリスト。 省略可能 例: []"eventstream-1"

API 本文のストリームの例:

{
  "streams": [
    {
      "name": "myEventstream-stream",
      "type": "DefaultStream",
      "properties":
      {},
      "inputNodes": [{"name": "sourceName"}]
    },
    {
      "name": "DerivedStreamName",
      "type": "DerivedStream",
      "properties":
      {
        "inputSerialization":
        {
          "type": "Json",
          "properties":
          {
            "encoding": "UTF8"
          }
        }
      },
      "inputNodes": [{"name": "FilterName"}]
    }
  ]
}