共用方式為


在 Azure IoT 作業中建立數據流

重要事項

本頁面包含使用 Kubernetes 部署資訊清單 (處於預覽狀態) 來管理 Azure IoT 操作元件的指示。 此功能以數項限制提供,不應用於生產工作負載。

請參閱 Microsoft Azure 預覽版增補使用規定,以了解適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的法律條款。

資料流程是透過選用轉換從來源到目的地取得資料的路徑。 您可以藉由建立 數據流 自訂資源或使用作業體驗 Web UI 來設定資料流。 資料流程由三個部分組成:來源轉換,以及目的地

資料流程的圖表,其中顯示從來源到轉換再到目的地的流程。

若要定義來源和目的地,您必須設定資料流程端點。 轉換是選擇性的,可以包含擴充資料、篩選資料,以及將資料對應至另一個欄位等作業。

重要事項

每個資料流程都必須將 Azure IoT Operations 本機 MQTT 代理程式的預設端點作為來源或目的地

您可以使用 Azure IoT 操作中的作業體驗來建立資料流程。 作業體驗提供視覺化介面來設定資料流程。 您也可以使用 Bicep 來使用 Bicep 檔案建立數據流,或使用 Kubernetes 使用 YAML 檔案建立數據流。

繼續閱讀以了解如何設定來源、轉換和目的地。

先決條件

一旦您擁有 Azure IoT 操作的實例,就可以使用預設資料流程設定檔和端點來部署資料流程。 不過,您可能想要設定資料流程設定檔和端點來自訂資料流程。

資料流程設定檔

如果不需要對資料流程進行不同的縮放設定,請使用 Azure IoT 操作所提供的預設資料流程設定檔。 您應該避免將太多數據流與單一數據流配置檔產生關聯。 如果您有大量的數據流,請將它們分散到多個數據流配置檔,以減少超過數據流配置檔組態大小限制的風險。

若要瞭解如何設定新的資料流配置檔,請參閱 設定數據流配置檔

資料流程端點

需要資料流程端點來設定資料流程的來源和目的地。 若要快速開始,您可以使用本機 MQTT 代理程式的預設資料流程端點。 您也可以建立其他類型的資料流程端點,例如 Kafka、事件中樞或 Azure Data Lake Storage。 若要了解如何設定每種資料流程端點的類型,請參閱設定資料流程端點

開始使用

一旦擁有先決條件後,您就可以開始建立資料流程。

  1. 若要在作業體驗中建立資料流程,請選取 [資料流程]>[建立資料流程]

  2. 選取佔位元名稱 new-data-flow 以設定數據流屬性。 輸入數據流的名稱,然後選擇要使用的數據流配置檔。 預設數據流配置檔會自動被選取。 如需數據流配置檔的詳細資訊,請參閱 設定數據流配置檔

    作業體驗介面的螢幕快照,其中用戶為數據流命名,併為其選取配置檔。

    重要事項

    您只能在建立資料流程時選擇資料流程設定檔。 建立資料流之後,您無法變更數據流配置檔。 如果您想要變更現有資料流的數據流配置檔,請刪除原始數據流,並使用新的數據流配置檔建立新的資料流配置檔。

  3. 選取數據流程圖中的專案,以設定數據流的來源、轉換和目的地端點。

    作業體驗介面的螢幕快照,其中顯示具有來源端點、轉換階段和目的地端點的數據流圖。

檢閱下列各節以了解如何設定資料流程的作業類型。

來源

若要設定資料流程的來源,請指定端點參考和端點的資料來源清單。 選擇下列其中一個選項作為資料流程的來源。

如果預設端點未作為來源使用,則必須將其用作目的地。 若要深入瞭解如何使用本機 MQTT 訊息代理程式端點,請參閱 數據流必須使用本機 MQTT 訊息代理程式端點

選項 1:使用預設訊息代理程式端點作為來源

  1. 在 [來源詳細資料] 下,選取 [訊息代理程式]

    作業體驗介面的螢幕快照,其中顯示訊息代理程式選取為數據流的來源端點。

  2. 針對訊息代理程式來源輸入以下設定:

    設定 描述
    資料流程端點 選取 [預設] 以使用預設 MQTT 訊息代理程式端點。
    主題 用於訂閱傳入訊息的主題篩選條件。 使用 [主題]> [新增資料列] 來新增多個主題。 如需主題的詳細資訊,請參閱設定 MQTT 或 Kafka 主題
    訊息結構描述 用於反序列化傳入訊息的結構描述。 請參閱指定結構描述來還原序列化資料
  3. 選取 [套用]

因為 dataSources 可讓您指定 MQTT 或 Kafka 主題而無需修改端點組態,所以即使主題不同,您也可以將端點重複用於多個資料流程。 若要深入了解,請參閱設定資料來源

選項 2:使用資產作為來源

您可以使用資產作為資料流程的來源。 使用資產作為來源僅在作業體驗中可用。

  1. 在 [來源詳細資料]下,選取 [資產]

  2. 選取您要用作來源端點的資產。

  3. 選取 [繼續]

    即會顯示所選取資產的資料點清單。

    使用作業體驗來選取資產作為來源端點的螢幕擷取畫面。

  4. 選取 [套用] 以使用資產作為來源端點。

使用資產作為來源時,會使用資產定義來推斷資料流程的結構描述。 資產定義包含資產資料點的結構描述。 若要深入了解,請參閱從遠端管理資產組態

設定之後,資產中的資料會透過本機 MQTT 代理程式到達資料流程。 因此,當使用資產作為來源時,資料流程實際上會使用本機 MQTT 代理程式預設端點作為來源。

選項 3:使用自訂 MQTT 或 Kafka 資料流程端點作為來源

如果您已建立自訂 MQTT 或 Kafka 資料流程端點 (例如,以搭配事件方格或事件中樞使用),則可以將其用作資料流程的來源。 請記住,儲存體類型端點 (如 Data Lake 或 Fabric OneLake) 不能用作來源。

  1. 在 [來源詳細資料] 下,選取 [訊息代理程式]

    使用作業體驗來選取自訂訊息代理程式作為來源端點的螢幕擷取畫面。

  2. 針對訊息代理程式來源輸入以下設定:

    設定 描述
    資料流程端點 使用 [重新選取] 按鈕來選取自訂 MQTT 或 Kafka 資料流程端點。 如需詳細資訊,請參閱設定 MQTT 資料流程端點設定 Azure 事件中樞和 Kafka 資料流程端點
    主題 用於訂閱傳入訊息的主題篩選條件。 使用 [主題]> [新增資料列] 來新增多個主題。 如需主題的詳細資訊,請參閱設定 MQTT 或 Kafka 主題
    訊息結構描述 用於反序列化傳入訊息的結構描述。 請參閱指定結構描述來還原序列化資料
  3. 選取 [套用]

設定資料來源 (MQTT 或 Kafka 主題)

您可以在一個來源中指定多個 MQTT 或 Kafka 主題,而無需修改資料流程端點組態。 這種彈性表示即使主題不同,也可以在多個資料流程中重複使用相同的端點。 如需詳細資訊,請參閱重複使用資料流程端點

MQTT 主題

當來源是 MQTT (包含事件方格) 端點時,您可以使用 MQTT 主題篩選條件來訂閱傳入訊息。 主題篩選條件可以包含萬用字元來訂閱多個主題。 例如, thermostats/+/sensor/temperature/# 訂閱控溫器的所有溫度感測器訊息。 若要設定 MQTT 主題篩選條件:

在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式],然後使用 [主題] 欄位來指定要訂閱傳入訊息的 MQTT 主題篩選條件。 您可以選取 [新增資料列] 並輸入新主題,以新增多個 MQTT 主題。

共用訂閱

若要搭配訊息代理程式來源使用共用訂閱,您可以使用 $shared/<GROUP_NAME>/<TOPIC_FILTER>的形式來指定共用訂閱主題。

在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式] 並使用 [主題] 欄位來指定共用訂閱群組和主題。

如果資料流程設定檔中的實例計數大於一,則會自動為使用訊息代理程式來源的所有資料流程啟用共用訂閱。 在此情況下,會新增 $shared 前置詞並自動產生共用訂閱群組名稱。 例如,如果您有執行個體計數為 3 的資料流程設定檔,且您的資料流程使用訊息代理程式端點作為以主題 topic1topic2 設定的來源,它們會自動轉換成共用訂閱 $shared/<GENERATED_GROUP_NAME>/topic1$shared/<GENERATED_GROUP_NAME>/topic2

您可以在組態中明確建立名為 $shared/mygroup/topic 的主題。 不過,不建議明確新增 $shared 主題,因為 $shared 前綴會在需要時自動新增。 資料流程可以使用群組名稱進行最佳化 (如果它未設定的話)。 例如,未設定 $share,且資料流程僅需對主題名稱進行運作。

重要事項

當使用事件方格 MQTT 代理程式作為來源時,實例數大於一時需要共用訂閱的資料流程非常重要,因為它不支援共用訂閱。 為了避免遺失訊息,在使用事件方格 MQTT 代理作為來源時,請將資料流程設定檔實例計數設為 1。 那就是當資料流程是訂閱者並從雲端中接收訊息的時候。

Kafka 主題

當來源是 Kafka (包含事件中樞) 端點時,請指定要訂閱傳入訊息的個別 Kafka 主題。 不支援萬用字元,因此您必須以靜態方式指定每個主題。

附註

透過 Kafka 端點使用事件中樞時,命名空間內的每個個別事件中樞都是 Kafka 主題。 如果您有一個包含兩個事件中樞 (thermostatshumidifiers) 的事件中樞命名空間,則可以將每個事件中樞指定為一個 Kafka 主題。

若要設定 Kafka 主題:

在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式],然後使用 [主題] 欄位來指定要訂閱傳入訊息的 Kafka 主題篩選條件。

附註

在作業體驗中只能指定一個主題篩選條件。 若要使用多個主題篩選條件,請使用 Bicep 或 Kubernetes。

指定來源結構描述

使用 MQTT 或 Kafka 作為來源時,您可以指定 架構 來顯示作業體驗 Web UI 中的數據點清單。 目前不支援使用結構描述來還原序列化和驗證傳入訊息。

如果來源是資產,則會自動從資產定義中推斷結構描述。

秘訣

若要從範例資料檔案中產生結構描述,請使用 Schema Gen Helper

若要設定用於還原序列化來自來源的傳入訊息的結構描述:

在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式] 並使用 [訊息結構描述] 欄位來指定結構描述。 您可以使用 [上傳] 按鈕來先上傳結構描述檔案。 若要深入了解,請參閱了解訊息結構描述

若要深入了解,請參閱了解訊息結構描述

轉換

轉換作業是在您將資料傳送至目的地之前,從來源轉換資料的位置。 轉換是選用的。 如果您不需要變更資料,請勿在資料流程組態中包含轉換作業。 無論在組態中指定的順序為何,多個轉換會分階段鏈結在一起。 階段的順序一律是:

  1. 擴充:在給定資料集和比對條件的情況下將其他資料新增至來源資料。
  2. 篩選:根據條件篩選資料。
  3. 對應計算重新命名或新增新屬性:使用選用轉換將資料從一個欄位移至另一個欄位。

本節是資料流程轉換的簡介。 如需詳細資訊,請參閱使用資料流程來對應資料使用資料流程轉換來轉換資料,以及使用資料流程來擴充資料

在作業體驗中,選取 [資料流程]>[新增轉換 (選用)]

作業體驗介面的螢幕快照,其中顯示將轉換階段新增至數據流。

擴充:新增參考資料

若要擴充資料,請先在 Azure IoT 操作狀態存放區中新增參考資料集。 資料集用於根據條件,將額外的資料新增至來源資料。 條件會指定為來源資料中的欄位,該欄位符合資料集中的欄位。

您可以使用狀態存放區 CLI 來將範例資料載入狀態存放區中。 狀態存放區中的索引鍵名稱對應於資料流程組態中的資料集。

目前,在作業體驗中不支援擴充階段。

如果資料集具有 asset 欄位的記錄,類似於:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

來自來源且 deviceId 欄位與 thermostat1 相符的資料在篩選和對應階段中具有可用的 locationmanufacturer 欄位。

如需條件語法的詳細資訊,請參閱使用資料流程來擴充資料使用資料流程來轉換資料

篩選:根據條件篩選資料

若要根據條件篩選資料,您可使用 filter 階段。 條件會指定為來源資料中符合某個值的欄位。

  1. 在 [轉換 (選用)] 下,選取 [篩選]>[新增]

    使用作業體驗來新增篩選轉換的螢幕擷取畫面。

  2. 輸入必要的設定。

    設定 描述
    篩選條件 根據來源資料中的欄位來篩選資料的條件。
    描述 提供篩選條件的描述。

    在篩選條件欄位中,輸入 @ 或選取 [Ctrl + 空格鍵] 以從下拉式清單中選擇資料點。

    您可以使用 @$metadata.user_properties.<property>@$metadata.topic 格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header> 來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata 語法。 如需詳細資訊,請參閱欄位參考

    條件可以使用來源資料中的欄位。 例如,您可以使用 @temperature > 20 之類的篩選條件來根據溫度欄位篩選小於或等於 20 的資料。

  3. 選取 [套用]

對應:將資料從一個欄位移到另一個欄位

若要透過選用轉換將資料對應至另一個欄位,您可使用 map 作業。 轉換會指定為使用來源資料中欄位的公式。

在作業體驗中,目前支援使用計算重新命名新增屬性轉換來進行對應。

計算

您可以使用計算轉換來將公式套用至來源資料。 此作業可用來將公式套用至來源資料並儲存結果欄位。

  1. 在 [轉換 (選用)] 下,選取 [計算]> [新增]

    使用作業體驗來新增計算轉換的螢幕擷取畫面。

  2. 輸入必要的設定。

    設定 描述
    選取公式 從下拉式清單中選擇現有的公式,或選取 [自訂] 以手動輸入公式。
    輸出 指定結果的輸出顯示名稱。
    公式 輸入要套用至來源資料的公式。
    描述 提供轉換的描述。
    上一個已知的值 或者,如果目前值無法使用,請使用最後一個已知值。

    您可以在 [公式] 欄位中輸入或編輯公式。 公式可以使用來源資料中的欄位。 輸入 @ 或選取 [Ctrl + 空格鍵] 以從下拉式清單中選擇資料點。 針對內建公式,選取 <dataflow> 預留位置以查看可用的資料點清單。

    您可以使用 @$metadata.user_properties.<property>@$metadata.topic 格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header> 來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata 語法。 如需詳細資訊,請參閱欄位參考

    公式可以使用來源資料中的欄位。 例如,您可使用來源資料中的 temperature 欄位,將溫度轉換為攝氏度並將其儲存在 temperatureCelsius 輸出欄位中。

  3. 選取 [套用]

重新命名

您可以使用重新命名轉換來重新命名資料點。 此作業可用來將來源資料中的資料點重新命名為新的名稱。 新的名稱可用於資料流程的後續階段。

  1. 在 [轉換 (選用)] 下,選取 [重新命名]>[新增]

    使用作業體驗來重新命名資料點的螢幕擷取畫面。

  2. 輸入必要的設定。

    設定 描述
    數據點 從下拉式清單中選取資料點,或輸入 $metadata 標頭。
    新增資料點名稱 輸入資料點的新名稱。
    描述 提供轉換的描述。

    您可以使用 @$metadata.user_properties.<property>@$metadata.topic 格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header> 來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata 語法。 如需詳細資訊,請參閱欄位參考

  3. 選取 [套用]

新屬性

您可以使用新屬性轉換來將新的屬性新增至來源資料中。 此作業可用來將新的屬性新增至來源資料中。 新的屬性可用於資料流程的後續階段。

  1. 在 [轉換 (選用)] 下,選取 [新增屬性]> [新增]

    使用作業體驗來新增屬性的螢幕擷取畫面。

  2. 輸入必要的設定。

    設定 描述
    屬性索引鍵 輸入新屬性的索引鍵。
    屬性值 輸入新屬性的值。
    描述 提供新增屬性的描述。
  3. 選取 [套用]

若要深入了解,請參閱使用資料流程來對應資料使用資料流程來轉換資料

移除

根據預設,所有數據點都會包含在輸出架構中。 您可以使用 移除 轉換,從目的地移除任何數據點。

  1. 轉換 (可選) 下,選取 移除

  2. 選取要從輸出架構中移除的數據點。

    使用作業體驗來移除加權資料點輸出結構描述的螢幕擷取畫面。

  3. 選取 [套用]

若要深入了解,請參閱使用資料流程來對應資料使用資料流程來轉換資料

根據結構描述來序列化資料

如果您想要在將資料傳送至目的地之前序列化資料,則需要指定結構描述和序列化格式。 否則,資料會以 JSON 格式和推斷的類型進行序列化。 Microsoft Fabric 或 Azure Data Lake 等儲存體端點需要結構描述來確保資料一致性。 支援的序列化格式為 Parquet 和 Delta。

秘訣

若要從範例資料檔案中產生結構描述,請使用 Schema Gen Helper

對於作業體驗,您可以在資料流程端點詳細資料中指定結構描述和序列化格式。 支援序列化格式的端點為 Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure 資料總管和本機儲存體。 例如,若要以 Delta 格式來序列化資料,您需要將結構描述上傳到結構描述登錄並在資料流程目的地端點組態中參考它。

使用作業體驗來設定資料流程目的地端點序列化的螢幕擷取畫面。

如需結構描述登錄的詳細資訊,請參閱了解訊息結構描述

目的地

若要設定資料流程的目的地,請指定端點參考和資料目的地。 您可指定端點的資料目的地清單。

若要將資料傳送至本機 MQTT 代理程式以外的目的地,請建立資料流程端點。 若要了解如何,請參閱設定資料流程端點。 如果目的地不是本機 MQTT 代理程式,則它必須用作來源。 若要深入瞭解如何使用本機 MQTT 訊息代理程式端點,請參閱 數據流必須使用本機 MQTT 訊息代理程式端點

重要事項

儲存體端點需要結構描述來進行序列化。 若要將資料流程與 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure 資料總管或本機存放區一起使用,則您必須指定結構描述參考

  1. 選取要用作目的地的資料流程端點。

    使用作業體驗來選取事件中樞目的地端點的螢幕擷取畫面。

    儲存體端點需要結構描述來進行序列化。 如果您選擇 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure 資料總管或本機儲存體目的地端點,則必須指定結構描述參考。 例如,若要將資料以 Delta 格式序列化到 Microsoft Fabric 端點,您需要將結構描述上傳到結構描述登錄並在資料流程目的地端點組態中參考它。

    使用作業體驗來選擇輸出結構描述和序列化格式的螢幕擷取畫面。

  2. 選取 [繼續] 以設定目的地。

  3. 輸入目的地的必要設定,包括要將資料傳送到其中的主題或資料表。 如需詳細資訊,請參閱設定資料目的地 (主題、容器或資料表)

設定資料目的地 (主題、容器或資料表)

與資料來源類似,資料目的地是一個用於保持資料流程端點在多個資料流程之間可重複使用的概念。 基本上,它代表資料流程端點組態中的子目錄。 例如,如果資料流程端點是儲存體端點,則資料目的地是儲存體帳戶中的資料表。 如果資料流程端點是 Kafka 端點,則資料目的地是 Kafka 主題。

端點類型 資料目的地意義 描述
MQTT (或事件方格) 主題 傳送資料所在的 MQTT 主題。 僅支援靜態主題,不支援萬用字元。
Kafka (或事件中樞) 主題 傳送資料的 Kafka 主題。 僅支援靜態主題,不支援萬用字元。 如果端點是事件中樞命名空間,則資料目的地是該命名空間內的個別事件中樞。
Azure Data Lake 儲存體 容器 儲存體帳戶中的容器。 不是資料表。
Microsoft Fabric OneLake 資料表或資料夾 對應於端點所設定的路徑類型
Azure 資料總管 Azure 資料總管資料庫中的資料表。
本機儲存體 資料夾 本機存放區永續性磁碟區掛接中的資料夾或目錄名稱。 使用由 Azure Arc Cloud Ingest Edge 磁碟區啟用的 Azure 容器儲存體時,這必須與您所建立的子磁碟區的 spec.path 參數相符。

若要設定資料目的地:

使用作業體驗時,資料目的地欄位會根據端點類型來自動解譯。 例如,如果資料流程端點是儲存體端點,則目的地詳細資料頁面會提示您輸入容器名稱。 如果資料流程端點是 MQTT 端點,則目的地詳細資料頁面會提示您輸入主題等項目。

此螢幕擷取畫面顯示了提示使用者根據端點類型來輸入 MQTT 主題的作業體驗。

範例

以下範例是將 MQTT 端點用於來源和目的地的資料流程組態。 來源會篩選 MQTT 主題 azure-iot-operations/data/thermostat 中的資料。 轉換將溫度轉換為華氏度,並篩選溫度乘以濕度小於 100000 的資料。 目的地會將資料傳送至 MQTT 主題 factory

此螢幕擷取畫面顯示了具有來源端點、轉換和目的地端點的作業體驗資料流程範例。

若要查看資料流程組態的更多範例,請參閱 Azure REST API - 資料流程快速入門 Bicep

驗證資料流程是否正常運作

請遵循教學指導:與 Azure 事件方格的雙向 MQTT 橋接,以確認資料流程是否正常運作。

匯出資料流程組態

若要匯出數據流組態,您可以使用作業體驗,或匯出數據流自定義資源。

選取您想要匯出的資料流程,然後從工具列中選取 [匯出]

作業體驗介面的螢幕快照,其中顯示已設定數據流的導出選項。

正確的資料流程組態

若要確保資料流程按預期運作,請驗證以下內容:

後續步驟