重要事項
本頁面包含使用 Kubernetes 部署資訊清單 (處於預覽狀態) 來管理 Azure IoT 操作元件的指示。 此功能以數項限制提供,不應用於生產工作負載。
請參閱 Microsoft Azure 預覽版增補使用規定,以了解適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的法律條款。
資料流程是透過選用轉換從來源到目的地取得資料的路徑。 您可以藉由建立 數據流 自訂資源或使用作業體驗 Web UI 來設定資料流。 資料流程由三個部分組成:來源、轉換,以及目的地。
若要定義來源和目的地,您必須設定資料流程端點。 轉換是選擇性的,可以包含擴充資料、篩選資料,以及將資料對應至另一個欄位等作業。
重要事項
每個資料流程都必須將 Azure IoT Operations 本機 MQTT 代理程式的預設端點作為來源或目的地。
您可以使用 Azure IoT 操作中的作業體驗來建立資料流程。 作業體驗提供視覺化介面來設定資料流程。 您也可以使用 Bicep 來使用 Bicep 檔案建立數據流,或使用 Kubernetes 使用 YAML 檔案建立數據流。
繼續閱讀以了解如何設定來源、轉換和目的地。
先決條件
一旦您擁有 Azure IoT 操作的實例,就可以使用預設資料流程設定檔和端點來部署資料流程。 不過,您可能想要設定資料流程設定檔和端點來自訂資料流程。
資料流程設定檔
如果不需要對資料流程進行不同的縮放設定,請使用 Azure IoT 操作所提供的預設資料流程設定檔。 您應該避免將太多數據流與單一數據流配置檔產生關聯。 如果您有大量的數據流,請將它們分散到多個數據流配置檔,以減少超過數據流配置檔組態大小限制的風險。
若要瞭解如何設定新的資料流配置檔,請參閱 設定數據流配置檔。
資料流程端點
需要資料流程端點來設定資料流程的來源和目的地。 若要快速開始,您可以使用本機 MQTT 代理程式的預設資料流程端點。 您也可以建立其他類型的資料流程端點,例如 Kafka、事件中樞或 Azure Data Lake Storage。 若要了解如何設定每種資料流程端點的類型,請參閱設定資料流程端點。
開始使用
一旦擁有先決條件後,您就可以開始建立資料流程。
若要在作業體驗中建立資料流程,請選取 [資料流程]>[建立資料流程]。
選取佔位元名稱 new-data-flow 以設定數據流屬性。 輸入數據流的名稱,然後選擇要使用的數據流配置檔。 預設數據流配置檔會自動被選取。 如需數據流配置檔的詳細資訊,請參閱 設定數據流配置檔。
重要事項
您只能在建立資料流程時選擇資料流程設定檔。 建立資料流之後,您無法變更數據流配置檔。
如果您想要變更現有資料流的數據流配置檔,請刪除原始數據流,並使用新的數據流配置檔建立新的資料流配置檔。
選取數據流程圖中的專案,以設定數據流的來源、轉換和目的地端點。
使用 az iot ops dataflow apply 命令來建立或變更數據流。
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
參數 --config-file
是 JSON 組態檔的路徑和檔名,其中包含資源屬性。
在此範例中,假設名為 data-flow.json
的組態檔,其中包含儲存在使用者主目錄中的下列內容:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
// See source configuration section
}
},
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
// See transformation configuration section
}
},
{
"operationType": "Destination",
"destinationSettings": {
// See destination configuration section
}
}
]
}
以下是使用預設資料流設定檔建立或更新資料流的範例命令:
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
建立 Bicep .bicep
檔案以開始建立資料流程。 此範例顯示包含來源、轉換和目的地組態之資料流程的結構。
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// See source configuration section
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// See transformation configuration section
}
}
{
operationType: 'Destination'
destinationSettings: {
// See destination configuration section
}
}
]
}
}
建立 Kubernetes 資訊清單 .yaml
檔案以開始建立資料流程。 此範例顯示包含來源、轉換和目的地組態之資料流程的結構。
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
# This field is required when configuring via Kubernetes YAML
# The syntax is different when using Bicep
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
# Transformation optional
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
檢閱下列各節以了解如何設定資料流程的作業類型。
來源
若要設定資料流程的來源,請指定端點參考和端點的資料來源清單。 選擇下列其中一個選項作為資料流程的來源。
如果預設端點未作為來源使用,則必須將其用作目的地。 若要深入瞭解如何使用本機 MQTT 訊息代理程式端點,請參閱 數據流必須使用本機 MQTT 訊息代理程式端點。
選項 1:使用預設訊息代理程式端點作為來源
在 [來源詳細資料] 下,選取 [訊息代理程式]。
針對訊息代理程式來源輸入以下設定:
設定 |
描述 |
資料流程端點 |
選取 [預設] 以使用預設 MQTT 訊息代理程式端點。 |
主題 |
用於訂閱傳入訊息的主題篩選條件。 使用 [主題]> [新增資料列] 來新增多個主題。 如需主題的詳細資訊,請參閱設定 MQTT 或 Kafka 主題。 |
訊息結構描述 |
用於反序列化傳入訊息的結構描述。 請參閱指定結構描述來還原序列化資料。 |
選取 [套用]。
以下是預設 MQTT 訊息代理程式端點的範例來源端點組態:
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default"
}
}
訊息代理程式端點是在 Bicep 檔案中設定的。 例如,下列端點是資料流程的來源。
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
例如,若要使用訊息代理程式端點和兩個主題篩選條件來設定來源,請使用下列組態:
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
因為 dataSources
可讓您指定 MQTT 或 Kafka 主題而無需修改端點組態,所以即使主題不同,您也可以將端點重複用於多個資料流程。 若要深入了解,請參閱設定資料來源。
選項 2:使用資產作為來源
您可以使用資產作為資料流程的來源。 使用資產作為來源僅在作業體驗中可用。
在 [來源詳細資料]下,選取 [資產]。
選取您要用作來源端點的資產。
選取 [繼續]。
即會顯示所選取資產的資料點清單。
選取 [套用] 以使用資產作為來源端點。
使用資產作為來源時,會使用資產定義來推斷資料流程的結構描述。 資產定義包含資產資料點的結構描述。 若要深入了解,請參閱從遠端管理資產組態。
設定之後,資產中的資料會透過本機 MQTT 代理程式到達資料流程。 因此,當使用資產作為來源時,資料流程實際上會使用本機 MQTT 代理程式預設端點作為來源。
選項 3:使用自訂 MQTT 或 Kafka 資料流程端點作為來源
如果您已建立自訂 MQTT 或 Kafka 資料流程端點 (例如,以搭配事件方格或事件中樞使用),則可以將其用作資料流程的來源。 請記住,儲存體類型端點 (如 Data Lake 或 Fabric OneLake) 不能用作來源。
在 [來源詳細資料] 下,選取 [訊息代理程式]。
針對訊息代理程式來源輸入以下設定:
選取 [套用]。
將預留位置值取代為您的自訂端點名稱和主題。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_1>",
"<TOPIC_2>"
]
}
}
將預留位置值取代為您的自訂端點名稱和主題。
sourceSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_1>'
'<TOPIC_2>'
// See section on configuring MQTT or Kafka topics for more information
]
}
將預留位置值取代為您的自訂端點名稱和主題。
sourceSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataSources:
- <TOPIC_1>
- <TOPIC_2>
# See section on configuring MQTT or Kafka topics for more information
您可以在一個來源中指定多個 MQTT 或 Kafka 主題,而無需修改資料流程端點組態。 這種彈性表示即使主題不同,也可以在多個資料流程中重複使用相同的端點。 如需詳細資訊,請參閱重複使用資料流程端點。
MQTT 主題
當來源是 MQTT (包含事件方格) 端點時,您可以使用 MQTT 主題篩選條件來訂閱傳入訊息。 主題篩選條件可以包含萬用字元來訂閱多個主題。 例如, thermostats/+/sensor/temperature/#
訂閱控溫器的所有溫度感測器訊息。 若要設定 MQTT 主題篩選條件:
在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式],然後使用 [主題] 欄位來指定要訂閱傳入訊息的 MQTT 主題篩選條件。 您可以選取 [新增資料列] 並輸入新主題,以新增多個 MQTT 主題。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<MESSAGE_BROKER_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_FILTER_1>",
"<TOPIC_FILTER_2>"
// Add more topic filters as needed
]
}
}
使用萬用字元的多個 MQTT 主題篩選條件的範例:
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
]
}
}
在這裡,萬用字元 +
會用來選取 thermostats
和 humidifiers
主題下的所有裝置。 #
通配符用於選取 temperature
和 humidity
主題所有子主題下的所有感測器訊息。
sourceSettings: {
endpointRef: '<MESSAGE_BROKER_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_FILTER_1>'
'<TOPIC_FILTER_2>'
// Add more topic filters as needed
]
}
使用萬用字元的多個 MQTT 主題篩選條件的範例:
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
在這裡,萬用字元 +
會用來選取 thermostats
和 humidifiers
主題下的所有裝置。 #
通配符用於選取 temperature
和 humidity
主題所有子主題下的所有感測器訊息。
sourceSettings:
endpointRef: <ENDPOINT_NAME>
dataSources:
- <TOPIC_FILTER_1>
- <TOPIC_FILTER_2>
# Add more topic filters as needed
使用萬用字元的多個主題篩選條件的範例:
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
在這裡,萬用字元 +
會用來選取 thermostats
和 humidifiers
主題下的所有裝置。 #
通配符用於選取 temperature
和 humidity
主題下所有子主題的所有訊息。
共用訂閱
若要搭配訊息代理程式來源使用共用訂閱,您可以使用 $shared/<GROUP_NAME>/<TOPIC_FILTER>
的形式來指定共用訂閱主題。
在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式] 並使用 [主題] 欄位來指定共用訂閱群組和主題。
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"$shared/<GROUP_NAME>/<TOPIC_FILTER>"
]
}
}
sourceSettings: {
dataSources: [
'$shared/<GROUP_NAME>/<TOPIC_FILTER>'
]
}
sourceSettings:
dataSources:
- $shared/<GROUP_NAME>/<TOPIC_FILTER>
如果資料流程設定檔中的實例計數大於一,則會自動為使用訊息代理程式來源的所有資料流程啟用共用訂閱。 在此情況下,會新增 $shared
前置詞並自動產生共用訂閱群組名稱。 例如,如果您有執行個體計數為 3 的資料流程設定檔,且您的資料流程使用訊息代理程式端點作為以主題 topic1
和 topic2
設定的來源,它們會自動轉換成共用訂閱 $shared/<GENERATED_GROUP_NAME>/topic1
和 $shared/<GENERATED_GROUP_NAME>/topic2
。
您可以在組態中明確建立名為 $shared/mygroup/topic
的主題。 不過,不建議明確新增 $shared
主題,因為 $shared
前綴會在需要時自動新增。 資料流程可以使用群組名稱進行最佳化 (如果它未設定的話)。 例如,未設定 $share
,且資料流程僅需對主題名稱進行運作。
重要事項
當使用事件方格 MQTT 代理程式作為來源時,實例數大於一時需要共用訂閱的資料流程非常重要,因為它不支援共用訂閱。 為了避免遺失訊息,在使用事件方格 MQTT 代理作為來源時,請將資料流程設定檔實例計數設為 1。 那就是當資料流程是訂閱者並從雲端中接收訊息的時候。
Kafka 主題
當來源是 Kafka (包含事件中樞) 端點時,請指定要訂閱傳入訊息的個別 Kafka 主題。 不支援萬用字元,因此您必須以靜態方式指定每個主題。
附註
透過 Kafka 端點使用事件中樞時,命名空間內的每個個別事件中樞都是 Kafka 主題。 如果您有一個包含兩個事件中樞 (thermostats
和 humidifiers
) 的事件中樞命名空間,則可以將每個事件中樞指定為一個 Kafka 主題。
若要設定 Kafka 主題:
在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式],然後使用 [主題] 欄位來指定要訂閱傳入訊息的 Kafka 主題篩選條件。
附註
在作業體驗中只能指定一個主題篩選條件。 若要使用多個主題篩選條件,請使用 Bicep 或 Kubernetes。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<KAFKA_ENDPOINT_NAME>",
"dataSources": [
"<KAFKA_TOPIC_1>",
"<KAFKA_TOPIC_2>"
// Add more Kafka topics as needed
]
}
}
sourceSettings: {
endpointRef: '<KAFKA_ENDPOINT_NAME>'
dataSources: [
'<KAFKA_TOPIC_1>'
'<KAFKA_TOPIC_2>'
// Add more Kafka topics as needed
]
}
sourceSettings:
endpointRef: <KAFKA_ENDPOINT_NAME>
dataSources:
- <KAFKA_TOPIC_1>
- <KAFKA_TOPIC_2>
# Add more Kafka topics as needed
指定來源結構描述
使用 MQTT 或 Kafka 作為來源時,您可以指定 架構 來顯示作業體驗 Web UI 中的數據點清單。 目前不支援使用結構描述來還原序列化和驗證傳入訊息。
如果來源是資產,則會自動從資產定義中推斷結構描述。
若要設定用於還原序列化來自來源的傳入訊息的結構描述:
在作業體驗資料流程來源詳細資料中,選取 [訊息代理程式] 並使用 [訊息結構描述] 欄位來指定結構描述。 您可以使用 [上傳] 按鈕來先上傳結構描述檔案。 若要深入了解,請參閱了解訊息結構描述。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<ENDPOINT_NAME>",
"serializationFormat": "Json",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>"
}
}
一旦您使用了結構描述登錄來儲存結構描述,您就可以在資料流程組態中參考它。
sourceSettings: {
serializationFormat: 'Json'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
}
一旦您使用了結構描述登錄來儲存結構描述,您就可以在資料流程組態中參考它。
sourceSettings:
serializationFormat: Json
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
若要深入了解,請參閱了解訊息結構描述。
轉換作業是在您將資料傳送至目的地之前,從來源轉換資料的位置。 轉換是選用的。 如果您不需要變更資料,請勿在資料流程組態中包含轉換作業。 無論在組態中指定的順序為何,多個轉換會分階段鏈結在一起。 階段的順序一律是:
- 擴充:在給定資料集和比對條件的情況下將其他資料新增至來源資料。
- 篩選:根據條件篩選資料。
- 對應、計算、重新命名或新增新屬性:使用選用轉換將資料從一個欄位移至另一個欄位。
本節是資料流程轉換的簡介。 如需詳細資訊,請參閱使用資料流程來對應資料、使用資料流程轉換來轉換資料,以及使用資料流程來擴充資料。
在作業體驗中,選取 [資料流程]>[新增轉換 (選用)]。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
// See section on enriching data
],
"filter": [
// See section on filtering data
],
"map": [
// See section on mapping data
]
}
}
builtInTransformationSettings: {
datasets: [
// See section on enriching data
]
filter: [
// See section on filtering data
]
map: [
// See section on mapping data
]
}
builtInTransformationSettings:
datasets:
# See section on enriching data
filter:
# See section on filtering data
map:
# See section on mapping data
擴充:新增參考資料
若要擴充資料,請先在 Azure IoT 操作狀態存放區中新增參考資料集。 資料集用於根據條件,將額外的資料新增至來源資料。 條件會指定為來源資料中的欄位,該欄位符合資料集中的欄位。
您可以使用狀態存放區 CLI 來將範例資料載入狀態存放區中。 狀態存放區中的索引鍵名稱對應於資料流程組態中的資料集。
若要擴充數據,您可以在數據流組態中使用 builtInTransformationSettings
屬性。 屬性 datasets
是用來指定要用於擴充的數據集。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
{
"key": "<DATASET_KEY>",
"inputs": [
"$source.<SOURCE_FIELD>" // ---------------- $1
"$context(<DATASET_KEY>).<DATASET_FIELD>" // - $2
],
"expression": "$1 == $2"
}
]
}
}
此範例顯示如何使用來源資料中的 deviceId
欄位來符合資料集中的 asset
欄位:
builtInTransformationSettings: {
datasets: [
{
key: 'assetDataset'
inputs: [
'$source.deviceId' // ---------------- $1
'$context(assetDataset).asset' // ---- $2
]
expression: '$1 == $2'
}
]
}
例如,您可使用來源資料中的 deviceId
欄位來比對資料集中的 asset
欄位:
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
如果資料集具有 asset
欄位的記錄,類似於:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
來自來源且 deviceId
欄位與 thermostat1
相符的資料在篩選和對應階段中具有可用的 location
和 manufacturer
欄位。
如需條件語法的詳細資訊,請參閱使用資料流程來擴充資料和使用資料流程來轉換資料。
篩選:根據條件篩選資料
若要根據條件篩選資料,您可使用 filter
階段。 條件會指定為來源資料中符合某個值的欄位。
在 [轉換 (選用)] 下,選取 [篩選]>[新增]。
輸入必要的設定。
設定 |
描述 |
篩選條件 |
根據來源資料中的欄位來篩選資料的條件。 |
描述 |
提供篩選條件的描述。 |
在篩選條件欄位中,輸入 @
或選取 [Ctrl + 空格鍵] 以從下拉式清單中選擇資料點。
您可以使用 @$metadata.user_properties.<property>
或 @$metadata.topic
格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header>
來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata
語法。 如需詳細資訊,請參閱欄位參考。
條件可以使用來源資料中的欄位。 例如,您可以使用 @temperature > 20
之類的篩選條件來根據溫度欄位篩選小於或等於 20 的資料。
選取 [套用]。
例如,您可使用來源資料中的 temperature
欄位來篩選資料:
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"filter": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"expression": "$1 > 20"
}
]
}
}
例如,您可使用來源資料中的 temperature
欄位來篩選資料:
builtInTransformationSettings: {
filter: [
{
inputs: [
'temperature ? $last'
]
expression: '$1 > 20'
}
]
}
如果 temperature
欄位大於 20,則會將資料傳遞至下一個階段。 如果 temperature
欄位小於或等於 20,則會篩選資料。
例如,您可使用來源資料中的 temperature
欄位來篩選資料:
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
如果 temperature
欄位大於 20,則會將資料傳遞至下一個階段。 如果 temperature
欄位小於或等於 20,則會篩選資料。
對應:將資料從一個欄位移到另一個欄位
若要透過選用轉換將資料對應至另一個欄位,您可使用 map
作業。 轉換會指定為使用來源資料中欄位的公式。
在作業體驗中,目前支援使用計算、重新命名和新增屬性轉換來進行對應。
計算
您可以使用計算轉換來將公式套用至來源資料。 此作業可用來將公式套用至來源資料並儲存結果欄位。
在 [轉換 (選用)] 下,選取 [計算]> [新增]。
輸入必要的設定。
設定 |
描述 |
選取公式 |
從下拉式清單中選擇現有的公式,或選取 [自訂] 以手動輸入公式。 |
輸出 |
指定結果的輸出顯示名稱。 |
公式 |
輸入要套用至來源資料的公式。 |
描述 |
提供轉換的描述。 |
上一個已知的值 |
或者,如果目前值無法使用,請使用最後一個已知值。 |
您可以在 [公式] 欄位中輸入或編輯公式。 公式可以使用來源資料中的欄位。 輸入 @
或選取 [Ctrl + 空格鍵] 以從下拉式清單中選擇資料點。 針對內建公式,選取 <dataflow>
預留位置以查看可用的資料點清單。
您可以使用 @$metadata.user_properties.<property>
或 @$metadata.topic
格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header>
來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata
語法。 如需詳細資訊,請參閱欄位參考。
公式可以使用來源資料中的欄位。 例如,您可使用來源資料中的 temperature
欄位,將溫度轉換為攝氏度並將其儲存在 temperatureCelsius
輸出欄位中。
選取 [套用]。
重新命名
您可以使用重新命名轉換來重新命名資料點。 此作業可用來將來源資料中的資料點重新命名為新的名稱。 新的名稱可用於資料流程的後續階段。
在 [轉換 (選用)] 下,選取 [重新命名]>[新增]。
輸入必要的設定。
設定 |
描述 |
數據點 |
從下拉式清單中選取資料點,或輸入 $metadata 標頭。 |
新增資料點名稱 |
輸入資料點的新名稱。 |
描述 |
提供轉換的描述。 |
您可以使用 @$metadata.user_properties.<property>
或 @$metadata.topic
格式來輸入 MQTT 中繼資料屬性。 您也可以使用格式 @$metadata.<header>
來輸入 $metadata 標頭。 只有屬於訊息標頭一部分的 MQTT 屬性才需要 $metadata
語法。 如需詳細資訊,請參閱欄位參考。
選取 [套用]。
新屬性
您可以使用新屬性轉換來將新的屬性新增至來源資料中。 此作業可用來將新的屬性新增至來源資料中。 新的屬性可用於資料流程的後續階段。
在 [轉換 (選用)] 下,選取 [新增屬性]> [新增]。
輸入必要的設定。
設定 |
描述 |
屬性索引鍵 |
輸入新屬性的索引鍵。 |
屬性值 |
輸入新屬性的值。 |
描述 |
提供新增屬性的描述。 |
選取 [套用]。
例如,您可使用來源資料中的 temperature
欄位,將溫度轉換成攝氏並將其儲存在 temperatureCelsius
欄位中。 您也可以使用內容化資料集的 location
欄位來擴充來源資料:
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"output": "temperatureCelsius",
"expression": "($1 - 32) * 5/9"
},
{
"inputs": [
"$context(assetDataset).location" // - $2
],
"output": "location"
}
]
}
}
您可以使用 $metadata.user_properties.<property>
或 $metadata.topic
格式來存取 MQTT 中繼資料屬性。 您也可以使用格式 $metadata.<header>
來輸入 $metadata 標頭。 如需詳細資訊,請參閱欄位參考。
例如,您可使用來源資料中的 temperature
欄位,將溫度轉換成攝氏並將其儲存在 temperatureCelsius
欄位中。 您也可以使用內容化資料集的 location
欄位來擴充來源資料:
builtInTransformationSettings: {
map: [
{
inputs: [
'temperature'
]
output: 'temperatureCelsius'
expression: '($1 - 32) * 5/9'
}
{
inputs: [
'$context(assetDataset).location'
]
output: 'location'
}
]
}
您可以使用 $metadata.user_properties.<property>
或 $metadata.topic
格式來存取 MQTT 中繼資料屬性。 您也可以使用格式 $metadata.<header>
來輸入 $metadata 標頭。 如需詳細資訊,請參閱欄位參考。
例如,您可使用來源資料中的 temperature
欄位,將溫度轉換成攝氏並將其儲存在 temperatureCelsius
欄位中。 您也可以使用內容化資料集的 location
欄位來擴充來源資料:
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
expression: "($1 - 32) * 5/9"
output: temperatureCelsius
- inputs:
- $context(assetDataset).location
output: location
若要深入了解,請參閱使用資料流程來對應資料及使用資料流程來轉換資料。
移除
根據預設,所有數據點都會包含在輸出架構中。 您可以使用 移除 轉換,從目的地移除任何數據點。
在 轉換 (可選) 下,選取 移除。
選取要從輸出架構中移除的數據點。
選取 [套用]。
若要從輸出架構中移除資料點,您可以在數據流組態中使用 builtInTransformationSettings
屬性。 屬性 map
是用來指定要移除的數據點。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"*"
],
"output": "*"
},
{
"inputs": [
"weight"
],
"output": ""
}
{
"inputs": [
"weight.SourceTimestamp"
],
"output": ""
},
{
"inputs": [
"weight.Value"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Code"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Symbol"
],
"output": ""
}
]
}
}
builtInTransformationSettings: {
map: [
{
inputs: [
'*'
]
output: '*'
}
{
inputs: [
'weight'
]
output: ''
}
{
inputs: [
'weight.SourceTimestamp'
]
output: ''
}
{
inputs: [
'weight.Value'
]
output: ''
}
{
inputs: [
'weight.StatusCode'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Code'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Symbol'
]
output: ''
}
]
}
builtInTransformationSettings:
map:
- type: PassThrough
inputs:
- "*"
output: "*"
- inputs:
- weight
output: ""
- inputs:
- weight.SourceTimestamp
output: ""
- inputs:
- weight.Value
output: ""
- inputs:
- weight.StatusCode
output: ""
- inputs:
- weight.StatusCode.Code
output: ""
- inputs:
- weight.StatusCode.Symbol
output: ""
若要深入了解,請參閱使用資料流程來對應資料及使用資料流程來轉換資料。
根據結構描述來序列化資料
如果您想要在將資料傳送至目的地之前序列化資料,則需要指定結構描述和序列化格式。 否則,資料會以 JSON 格式和推斷的類型進行序列化。 Microsoft Fabric 或 Azure Data Lake 等儲存體端點需要結構描述來確保資料一致性。 支援的序列化格式為 Parquet 和 Delta。
對於作業體驗,您可以在資料流程端點詳細資料中指定結構描述和序列化格式。 支援序列化格式的端點為 Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure 資料總管和本機儲存體。 例如,若要以 Delta 格式來序列化資料,您需要將結構描述上傳到結構描述登錄並在資料流程目的地端點組態中參考它。
一旦您將結構描述上傳至結構描述登錄,您就可以在資料流程組態中參考它。
{
"builtInTransformationSettings": {
"serializationFormat": "Delta",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>"
}
}
一旦您將結構描述上傳至結構描述登錄,您就可以在資料流程組態中參考它。
builtInTransformationSettings: {
serializationFormat: 'Delta'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
}
一旦您將結構描述上傳至結構描述登錄,您就可以在資料流程組態中參考它。
builtInTransformationSettings:
serializationFormat: Delta
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
如需結構描述登錄的詳細資訊,請參閱了解訊息結構描述。
目的地
若要設定資料流程的目的地,請指定端點參考和資料目的地。 您可指定端點的資料目的地清單。
若要將資料傳送至本機 MQTT 代理程式以外的目的地,請建立資料流程端點。 若要了解如何,請參閱設定資料流程端點。 如果目的地不是本機 MQTT 代理程式,則它必須用作來源。 若要深入瞭解如何使用本機 MQTT 訊息代理程式端點,請參閱 數據流必須使用本機 MQTT 訊息代理程式端點。
重要事項
儲存體端點需要結構描述來進行序列化。 若要將資料流程與 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure 資料總管或本機存放區一起使用,則您必須指定結構描述參考。
選取要用作目的地的資料流程端點。
儲存體端點需要結構描述來進行序列化。 如果您選擇 Microsoft Fabric OneLake、Azure Data Lake Storage、Azure 資料總管或本機儲存體目的地端點,則必須指定結構描述參考。 例如,若要將資料以 Delta 格式序列化到 Microsoft Fabric 端點,您需要將結構描述上傳到結構描述登錄並在資料流程目的地端點組態中參考它。
選取 [繼續] 以設定目的地。
輸入目的地的必要設定,包括要將資料傳送到其中的主題或資料表。 如需詳細資訊,請參閱設定資料目的地 (主題、容器或資料表)。
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
destinationSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataDestination: '<TOPIC_OR_TABLE>' // See section on configuring data destination
}
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE> # See section on configuring data destination
與資料來源類似,資料目的地是一個用於保持資料流程端點在多個資料流程之間可重複使用的概念。 基本上,它代表資料流程端點組態中的子目錄。 例如,如果資料流程端點是儲存體端點,則資料目的地是儲存體帳戶中的資料表。 如果資料流程端點是 Kafka 端點,則資料目的地是 Kafka 主題。
端點類型 |
資料目的地意義 |
描述 |
MQTT (或事件方格) |
主題 |
傳送資料所在的 MQTT 主題。 僅支援靜態主題,不支援萬用字元。 |
Kafka (或事件中樞) |
主題 |
傳送資料的 Kafka 主題。 僅支援靜態主題,不支援萬用字元。 如果端點是事件中樞命名空間,則資料目的地是該命名空間內的個別事件中樞。 |
Azure Data Lake 儲存體 |
容器 |
儲存體帳戶中的容器。 不是資料表。 |
Microsoft Fabric OneLake |
資料表或資料夾 |
對應於端點所設定的路徑類型。 |
Azure 資料總管 |
表 |
Azure 資料總管資料庫中的資料表。 |
本機儲存體 |
資料夾 |
本機存放區永續性磁碟區掛接中的資料夾或目錄名稱。 使用由 Azure Arc Cloud Ingest Edge 磁碟區啟用的 Azure 容器儲存體時,這必須與您所建立的子磁碟區的 spec.path 參數相符。 |
若要設定資料目的地:
使用作業體驗時,資料目的地欄位會根據端點類型來自動解譯。 例如,如果資料流程端點是儲存體端點,則目的地詳細資料頁面會提示您輸入容器名稱。 如果資料流程端點是 MQTT 端點,則目的地詳細資料頁面會提示您輸入主題等項目。
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
例如,若要將資料傳回至本機 MQTT 訊息代理程式靜態 MQTT 主題,請使用下列組態:
{
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "example-topic"
}
}
或者,如果您有自訂事件中樞端點,則組態看起來會像這樣:
{
"destinationSettings": {
"endpointRef": "my-eh-endpoint",
"dataDestination": "individual-event-hub"
}
}
語法對於所有資料流程端點都是相同的:
destinationSettings: {
endpointRef: "<CUSTOM_ENDPOINT_NAME>"
dataDestination: '<TOPIC_OR_TABLE>'
}
例如,若要將資料傳回至本機 MQTT 訊息代理程式靜態 MQTT 主題,請使用下列組態:
destinationSettings: {
endpointRef: 'default'
dataDestination: 'example-topic'
}
或者,如果您有自訂事件中樞端點,則組態看起來會像這樣:
destinationSettings: {
endpointRef: 'my-eh-endpoint'
dataDestination: 'individual-event-hub'
}
另一個使用儲存體端點作為目的地的範例:
destinationSettings: {
endpointRef: 'my-adls-endpoint'
dataDestination: 'my-container'
}
語法對於所有資料流程端點都是相同的:
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE>
例如,若要將資料傳回至本機 MQTT 訊息代理程式靜態 MQTT 主題,請使用下列組態:
destinationSettings:
endpointRef: default
dataDestination: example-topic
或者,如果您有自訂事件中樞端點,則組態看起來會像這樣:
destinationSettings:
endpointRef: my-eh-endpoint
dataDestination: individual-event-hub
另一個使用儲存體端點作為目的地的範例:
destinationSettings:
endpointRef: my-adls-endpoint
dataDestination: my-container
範例
以下範例是將 MQTT 端點用於來源和目的地的資料流程組態。 來源會篩選 MQTT 主題 azure-iot-operations/data/thermostat
中的資料。 轉換將溫度轉換為華氏度,並篩選溫度乘以濕度小於 100000 的資料。 目的地會將資料傳送至 MQTT 主題 factory
。
使用 az iot ops dataflow apply 命令來建立或變更數據流。
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
參數 --config-file
是 JSON 組態檔的路徑和檔名,其中包含資源屬性。
在此範例中,假設名為 data-flow.json
的組態檔,其中包含儲存在使用者主目錄中的下列內容:
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"builtInTransformationSettings": {
"datasets": [],
"filter": [
{
"expression": "$1 * $2 < 100000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value"
],
"type": "Filter"
}
],
"map": [
{
"inputs": [
"*"
],
"output": "*",
"type": "PassThrough"
},
{
"expression": "fToC($1)",
"inputs": [
"Temperature.Value"
],
"output": "TemperatureF",
"type": "Compute"
},
{
"inputs": [
"@\"Tag 10\".Value"
],
"output": "Humidity",
"type": "Rename"
}
],
"serializationFormat": "Json"
},
"operationType": "BuiltInTransformation"
},
{
"destinationSettings": {
"dataDestination": "factory",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
以下是使用預設資料流設定檔建立或更新資料流的範例命令:
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
// Pointer to the default data flow endpoint
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// Use the default MQTT endpoint as the source
endpointRef: defaultDataflowEndpoint.name
// Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources: [
'azure-iot-operations/data/thermostat'
]
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// Filter the data where temperature * "Tag 10" < 100000
filter: [
{
inputs: [
'temperature.Value'
'"Tag 10".Value'
]
expression: '$1 * $2 < 100000'
}
]
map: [
// Passthrough all values by default
{
inputs: [
'*'
]
output: '*'
}
// Convert temperature to Fahrenheit and output it to TemperatureF
{
inputs: [
'temperature.Value'
]
output: 'TemperatureF'
expression: 'cToF($1)'
}
// Extract the "Tag 10" value and output it to Humidity
{
inputs: [
'"Tag 10".Value'
]
output: 'Humidity'
}
]
}
}
{
operationType: 'Destination'
destinationSettings: {
// Use the default MQTT endpoint as the destination
endpointRef: defaultDataflowEndpoint.name
// Send the data to the MQTT topic factory
dataDestination: 'factory'
}
}
]
}
}
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# Use the default MQTT endpoint as the source
endpointRef: default
# Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources:
- azure-iot-operations/data/thermostat
# Transformation optional
- operationType: builtInTransformation
builtInTransformationSettings:
# Filter the data where temperature * "Tag 10" < 100000
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: '$1 * $2 < 100000'
map:
# Passthrough all values by default
- inputs:
- '*'
output: '*'
# Convert temperature to Fahrenheit and output it to TemperatureF
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
# Extract the "Tag 10" value and output it to Humidity
- inputs:
- '"Tag 10".Value'
output: 'Humidity'
- operationType: Destination
destinationSettings:
# Use the default MQTT endpoint as the destination
endpointRef: default
# Send the data to the MQTT topic factory
dataDestination: factory
若要查看資料流程組態的更多範例,請參閱 Azure REST API - 資料流程和快速入門 Bicep。
驗證資料流程是否正常運作
請遵循教學指導:與 Azure 事件方格的雙向 MQTT 橋接,以確認資料流程是否正常運作。
匯出資料流程組態
若要匯出數據流組態,您可以使用作業體驗,或匯出數據流自定義資源。
選取您想要匯出的資料流程,然後從工具列中選取 [匯出]。
使用 az iot ops dataflow show 命令導出數據流。
az iot ops dataflow show --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <DataflowName> --profile <DataflowProfileName> --output json > my-dataflow.json
以下是一個將名為 data-flow
的數據流匯出為名為 data-flow.json
的 JSON 檔案的範例命令:
az iot ops dataflow show --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --output json > data-flow.json
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml
正確的資料流程組態
若要確保資料流程按預期運作,請驗證以下內容:
後續步驟