這很重要
帶有資料流程圖的 WebAssembly(WASM)目前已進入預覽階段。 此功能有限制,不適用於生產工作負載。
如需適用於 Beta 版、預覽版或尚未正式推出的 Azure 功能的法律條款,請參閱適用於 Microsoft Azure 預覽的補充使用規定。
Azure IoT 操作資料流程圖支援 WebAssembly (WASM) 模組,以在邊緣進行自訂資料處理。 您可以將自訂商務規則和資料轉換部署為資料流程管線的一部分。
小提示
想要在帶內運行 AI 嗎? 請參閱 在 WebAssembly 資料流程圖中執行 ONNX 推論 ,以在 WASM 運算子內封裝和執行小型 ONNX 模型。
這很重要
資料流程圖目前僅支援 MQTT、Kafka 和 OpenTelemetry 端點。 其他端點類型 (例如 Data Lake、Microsoft Fabric OneLake、Azure 資料總管和本機存放區) 不受支援。 如需詳細資訊,請參閱
先決條件
- 在已啟用 Arc 的 Kubernetes 叢集上部署 Azure IoT 作業執行個體。 如需詳細資訊,請參閱 部署 Azure IoT 作業。
- 使用 Azure Container Registry (ACR) 來儲存 WASM 模組和圖表。
- 安裝 OCI 登錄即儲存體 (ORAS) CLI,將 WASM 模組推送至登錄。
- 遵循為資料流程圖開發 WebAssembly 模組中的指引,開發自訂 WASM 模組。
概觀
Azure IoT 操作資料流程圖中的 WebAssembly (WASM) 模組可讓您以高效能和安全性處理邊緣的資料。 WASM 會在沙箱化環境中執行,並支援 Rust 和 Python。
WASM 資料流程圖的運作方式
WASM 資料流程實作遵循此工作流程:
- 開發 WASM 模組:以支援的語言撰寫自訂處理邏輯,並將其編譯為 WebAssembly 元件模型格式。
- 開發圖表定義:定義資料如何使用 YAML 設定檔在模組中移動。 如需詳細資訊,請參閱 設定 WebAssembly 圖形定義。
- 將成品儲存在登錄中:使用 ORAS 等與 OCI 相容的工具,將編譯的 WASM 模組推送至容器登錄。
- 設定登錄端點:設定驗證和連線詳細資料,讓 Azure IoT 操作可以存取容器登錄。
- 建立資料流程圖:定義資料來源、工件名稱及目的地。
- 部署和執行:Azure IoT 操作會從登錄提取 WASM 模組,並根據圖表定義執行它們。
以下範例說明如何設置並部署 WASM 資料流程圖以應對常見情境。 這些範例會使用硬式編碼的值和簡化的設定,讓您可以快速開始使用。
設定容器登錄
Azure IoT 操作需要容器登錄來提取 WASM 模組和圖表定義。 您可以使用 Azure Container Registry (ACR) 或另一個與 OCI 相容的登錄。
若要建立及設定 Azure Container Registry,請參閱部署 Azure Container Registry。
安裝 ORAS CLI
使用 ORAS CLI 將 WASM 模組和圖表定義推送至您的容器登錄。 如需安裝指示,請參閱安裝 ORAS。
從公用登錄提取範例模組
使用預先建置的範例模組:
# Pull sample modules and graphs
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-simple:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-complex:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/temperature:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/window:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/snapshot:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/format:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/humidity:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/collection:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/enrichment:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/filter:1.0.0
將模組推送至您的登錄
取得範例模組和圖形之後,請將它們推送至容器登錄。 將 <YOUR_ACR_NAME> 替換為 Azure 容器註冊表的名稱。 為了確保圖形和模組在操作體驗的網頁介面中可見,請如以下範例所示加入 --config 和 --artifact-type 標誌:
# Log in to your ACR
az acr login --name <YOUR_ACR_NAME>
# Push modules to your registry
oras push <YOUR_ACR_NAME>.azurecr.io/graph-simple:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-simple.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/graph-complex:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-complex.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/temperature:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm temperature-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/window:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm window-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/snapshot:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm snapshot-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/format:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm format-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/humidity:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm humidity-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/collection:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm collection-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/enrichment:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm enrichment-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/filter:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm filter-1.0.0.wasm:application/wasm
小提示
您也可以推送自己的模組並建立自訂圖表,請參閱 設定自訂資料流程圖表。
建立登錄端點
登錄端點會定義容器登錄的連線。 資料流程圖會使用登錄端點,從容器登錄提取 WASM 模組和圖表定義。 如需使用不同驗證方法和登錄類型設定登錄端點的詳細資訊,請參閱設定登錄端點。
要建立登錄端點,你可以使用 Azure 入口網站、Bicep 或 Kubernetes 的清單。 建立登錄端點之後,您推送至容器登錄的圖表已準備好用於資料流程圖表中的操作體驗。
如需使用 Azure Container Registry 進行快速設定,請使用系統指派的受控識別驗證來建立登錄端點:
你可以使用 Azure 入口網站來建立登錄端點。 入口網站體驗允許你指定並提供 ACR 的主機資料,並可選擇提供憑證。 在開始之前,請確保你擁有以下資訊:
- 登錄端點名稱。
- ACR 的宿主名稱。
- 支援四種認證方式:
- 匿名
- 系統管理的帳戶識別
- 使用者管理的身分識別
- 神器祕密
要在 Azure 入口網站建立登錄端點,請依照以下步驟操作。
建立帶有匿名認證的登錄端點
你可以透過指定 Azure 容器登錄檔(ACR)的主機細節,啟用匿名存取以進行公開映像擷取,並儲存設定以供重複使用來建立新的登錄端點。 首先,選擇你想使用的認證類型。 在這個例子中,我們使用匿名認證:
建立帶有系統管理身份驗證的登錄端點
你可以透過指定 ACR 的主機細節來建立新的登錄端點,使用系統指定的管理身份進行驗證以確保安全存取,並將設定儲存以便重複使用。
建立帶有使用者管理身份的登錄端點
你可以透過指定 ACR 的主機細節來建立新的登錄端點,使用使用者指定的管理身份進行認證以確保安全存取,並將設定儲存以便重複使用。
備註
啟用使用者管理身份時,必須使用用戶端與租戶 ID。
建立帶有工件秘密的登錄端點
工件秘密用於在拉取容器映像時,與私有容器登錄檔如 ACR、Docker Hub 或 MCR 進行認證。 當登錄檔需要憑證,且映像檔無法公開存取時,秘密是必須的。 此情境讓您能管理 Azure 物聯網營運與營運體驗之間的資料流圖。 從 Azure Key Vault 中選擇現有的機密,可以設定工件機密。
你可以透過指定 ACR 的主機細節,使用工件秘密進行驗證以確保安全存取,並儲存設定以供重複使用,來建立新的登錄端點:
要在 Azure Key Vault 中設定工件的秘密,你需要建立新的秘密並將它們儲存在 Azure Key Vault 中:
備註
您可以跨多個資料流程圖和其他 Azure IoT 操作元件重複使用登錄端點,例如 Akri 連接器。
取得延伸模組名稱
# Get extension name
az k8s-extension list \
--resource-group <RESOURCE_GROUP> \
--cluster-name <CLUSTER_NAME> \
--cluster-type connectedClusters \
--query "[?extensionType=='microsoft.iotoperations'].name" \
--output tsv
第一個命令會傳回延伸模組名稱 (例如,azure-iot-operations-4gh3y)。
設定受控識別權限
若要讓 Azure IoT 操作從容器登錄中提取 WASM 模組,請為受控識別提供正確的權限。 IoT 操作延伸模組會使用系統指派的受控識別,此受控識別需要 Azure Container Registry 上的 AcrPull 角色。 確定您已滿足下列先決條件:
- Azure Container Registry 上的擁有者權限。
- 容器登錄可以位於不同的資源群組或訂用帳戶中,但其必須位於與 IoT 操作部署相同的租用戶中。
執行下列命令,將 AcrPull 角色指派給 IoT 操作受控識別:
# Get the IoT Operations extension managed identity
export EXTENSION_OBJ_ID=$(az k8s-extension list --cluster-name $CLUSTER_NAME -g $RESOURCE_GROUP --cluster-type connectedClusters --query "[?extensionType=='microsoft.iotoperations'].identity.principalId" -o tsv)
# Get the application ID for the managed identity
export SYSTEM_ASSIGNED_MAN_ID=$(az ad sp show --id $EXTENSION_OBJ_ID --query "appId" -o tsv)
# Assign the AcrPull role to the managed identity
az role assignment create --role "AcrPull" --assignee $SYSTEM_ASSIGNED_MAN_ID --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.ContainerRegistry/registries/$ACR_NAME"
如需容器登錄角色的詳細資訊,請參閱 Azure Container Registry 角色和權限。
如果您得到 Azure CLI 的驗證錯誤,請在 Azure 入口網站中指派權限:
- 在 Azure 入口網站中移至您的 Azure Container Registry。
- 從功能表中,選取存取控制 (IAM)。
- 選取 新增>新增角色指派。
- 選擇 AcrPull 內建角色。
- 選取 [使用者、群組或服務主體] 作為 [存取權指派對象] 選項。
- 搜尋並選取您的 IoT 操作延伸模組名稱 (例如,
azure-iot-operations-4gh3y)。 - 選擇儲存以完成角色指派。
如需詳細指示,請參閱使用 Azure 入口網站指派 Azure 角色。
範例 1:搭配一個 WASM 模組的基本部署
此範例會使用 WASM 模組,將溫度資料從華氏轉換為攝氏。 GitHub 上提供溫度模組原始程式碼。 使用您推送至容器登錄的先行編譯版本 graph-simple:1.0.0。
運作方式
圖表定義會建立簡單的三階段管線:
- 來源:從 MQTT 接收溫度資料
- 對應:使用溫度 WASM 模組處理資料
- 接收:將轉換的資料傳回 MQTT
如需簡單圖形定義運作方式及其結構的詳細資訊,請參閱 範例 1:簡單圖形定義。
輸入格式:
{"temperature": {"value": 100.0, "unit": "F"}}
輸出格式:
{"temperature": {"value": 37.8, "unit": "C"}}
下列設定會建立一個使用此溫度轉換管線的資料流程圖。 此圖表會參考 graph-simple:1.0.0 成品,其中包含 YAML 定義,並從容器登錄中提取溫度模組。
設定資料流程圖
此設定會定義三個實作溫度轉換工作流程的節點:訂閱傳入溫度資料的來源節點、執行 WASM 模組的圖形處理節點,以及發佈轉換結果的目的地節點。
資料流程圖資源會「包裝」圖表定義成品,並將其抽象來源/接收作業連線到具體端點:
- 圖表定義的
source作業會連線到資料流程的來源節點 (MQTT 主題) - 圖表定義的
sink作業會連線到資料流程的目的地節點 (MQTT 主題) - 圖表定義的處理作業會在圖表處理節點內執行
此分離可讓您跨環境使用不同的端點來部署相同的圖表定義,同時將處理邏輯保持不變。
若要在 作業體驗中建立資料流程圖,請移至 [資料流程 ] 索引標籤。
選取 [+ 建立] 旁的下拉式功能表,然後選取 [建立資料流程圖]
選取佔位元名稱 new-data-flow 以設定數據流屬性。 輸入資料流程圖的名稱,然後選擇要使用的資料流程設定檔。
在資料流程圖中,選取 [ 來源 ] 以設定來源節點。 在 [來源詳細資料] 底下,選取 [資產] 或 [資料流程端點]。
如果您選取 資產,請選擇要從中提取資料的資產,然後按一下 套用。
如果您選取 [資料流程端點],請輸入下列詳細資料,然後按一下 [套用]。
Setting Description 資料流程端點 選取 [預設] 以使用預設 MQTT 訊息代理程式端點。 主題 用於訂閱傳入訊息的主題篩選條件。 使用 [主題]> [新增資料列] 來新增多個主題。 訊息結構描述 用於反序列化傳入訊息的結構描述。
在資料流程圖中,選取 [ 新增圖形轉換 (選擇性) ] 以新增圖形處理節點。 在 [圖形選取] 窗格中,選取 [graph-simple:1 ],然後按一下 [套用]。
這很重要
此範例會使用您
graph-simple:1.0.0的 成品。 您可以 開發自己的 WASM 模組 ,並將其推送至容器登錄,以建立自訂圖形。 您推送至容器登錄的圖形可在 [圖形選取 ] 窗格中使用。您可以選取圖表中的圖形節點,以設定某些圖形運算子設定。 例如,您可以選取 模組溫度/地圖 運算子,然後輸入
key2值example-value-2。 按兩下 [套用 ] 以儲存變更。在資料流程圖中,選取 [目的地] 以設定目的地節點。
選取資料流程圖名稱下的 [儲存] 以儲存資料流程圖。
測試資料流程
若要測試資料流程,請從叢集內傳送 MQTT 訊息。 首先,遵循使用 MQTT 用戶端測試與 MQTT 代理程式的連線中的指示,部署 MQTT 用戶端 Pod。 MQTT 用戶端會提供驗證權杖和憑證,以連線到代理程式。 若要部署 MQTT 用戶端,請執行下列命令:
kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/samples/quickstarts/mqtt-client.yaml
傳送溫度訊息
在第一個終端機工作階段中,建立並執行指令碼,以華氏傳送溫度資料:
# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
# Create and run temperature.sh from within the MQTT client pod
while true; do
# Generate a random temperature value between 0 and 6000 Fahrenheit
random_value=$(shuf -i 0-6000 -n 1)
payload="{\"temperature\":{\"value\":$random_value,\"unit\":\"F\"}}"
echo "Publishing temperature: $payload"
# Publish to the input topic
mosquitto_pub -h aio-broker -p 18883 \
-m "$payload" \
-t "sensor/temperature/raw" \
-d \
--cafile /var/run/certs/ca.crt \
-D PUBLISH user-property __ts $(date +%s)000:0:df \
-D CONNECT authentication-method 'K8S-SAT' \
-D CONNECT authentication-data $(cat /var/run/secrets/tokens/broker-sat)
sleep 1
done'
備註
MQTT 使用者屬性 __ts 可用來將時間戳記新增至訊息,以確保使用混合式邏輯時鐘 (HLC) 及時處理訊息。 具有時間戳記可協助資料流程決定是否接受或捨棄訊息。 該屬性的格式是 <timestamp>:<counter>:<nodeid>。 它可讓資料流程處理更準確,但並非必要。
指令碼每秒會將隨機溫度資料發佈到 sensor/temperature/raw 主題。 它看起來應該如下所示:
Publishing temperature: {"temperature":{"value":1234,"unit":"F"}}
Publishing temperature: {"temperature":{"value":5678,"unit":"F"}}
讓指令碼保持執行中的狀態以繼續發佈溫度資料。
訂閱已處理的訊息
在第二個終端工作階段中 (也連線到 MQTT 用戶端 Pod),訂閱輸出主題以查看轉換的溫度值:
# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "sensor/temperature/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'
您會看到 WASM 模組從華氏轉換為攝氏的溫度資料。
{"temperature":{"value":1292.2222222222222,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
{"temperature":{"value":203.33333333333334,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
範例 2:部署複雜圖表
此範例示範複雜的資料處理工作流程,其中處理多種資料類型,包括溫度、濕度和影像資料。 複雜圖表定義會協調多個 WASM 模組,以執行進階分析和物件偵測。
運作方式
複雜圖表會處理三個資料流,並將其合併為擴充的感應器分析:
- 溫度處理:將華氏轉換為攝氏、篩選無效讀數,並計算統計資料
- 濕度處理:累積時間間隔的濕度測量
- 影像處理:對相機快照集執行物件偵測,並格式化結果
如需複雜圖形定義如何運作、其結構和數據流透過多個處理階段的詳細資訊,請參閱 範例 2:複雜圖形定義。
圖形會使用 Rust 範例中的特製化模組。
設定複雜的資料流程圖
此設定會使用 graph-complex:1.0.0 成品來實作多感應器處理工作流程。 請注意資料流程圖部署與範例 1 類似 - 兩者都使用相同的三節點模式 (來源、圖形處理器、目的地),即使處理邏輯不同也一樣。
出現這種相似性是因為資料流程圖資源會充當載入和執行圖表定義的主機環境。 實際的處理邏輯位於圖表定義成品 (graph-simple:1.0.0 與 graph-complex:1.0.0),其中包含 WASM 模組之間作業和連線的 YAML 規格。 資料流程圖資源會提供執行階段基礎結構來提取成品、具現化模組,以及透過定義的工作流程路由資料。
若要在 作業體驗中建立資料流程圖,請移至 [資料流程 ] 索引標籤。
選取 [+ 建立] 旁的下拉式功能表,然後選取 [建立資料流程圖]
選取佔位元名稱 new-data-flow 以設定數據流屬性。 輸入資料流程圖的名稱,然後選擇要使用的資料流程設定檔。
在資料流程圖中,選取 [ 來源 ] 以設定來源節點。 在 [來源詳細資料] 底下,選取 [資產] 或 [資料流程端點]。
如果您選取 資產,請選擇要從中提取資料的資產,然後按一下 套用。
如果您選取 [資料流程端點],請輸入下列詳細資料,然後按一下 [套用]。
Setting Description 資料流程端點 選取 [預設] 以使用預設 MQTT 訊息代理程式端點。 主題 用於訂閱傳入訊息的主題篩選條件。 使用 [主題]> [新增資料列] 來新增多個主題。 訊息結構描述 用於反序列化傳入訊息的結構描述。
在資料流程圖中,選取 [ 新增圖形轉換 (選擇性) ] 以新增圖形處理節點。 在 [圖形選取] 窗格中,選取 [圖形複雜:1 ],然後按一下 [ 套用]。
這很重要
此範例會使用您
graph-complex:1.0.0的 成品。 您可以 開發自己的 WASM 模組 ,並將其推送至容器登錄,以建立自訂圖形。 您推送至容器登錄的圖形可在 [圖形選取 ] 窗格中使用。您可以選取圖表中的圖形節點,以設定某些圖形運算子設定。
Operator Description 模組快照/分支 將 snapshot模組設定為對影像執行物件偵測。 您可以設定snapshot_topic組態索引鍵,以指定影像資料的輸入主題。模組溫度/地圖 將 key2溫度值轉換為不同的刻度。按兩下 [套用 ] 以儲存變更。
在資料流程圖中,選取 [目的地] 以設定目的地節點。
選取資料流程圖名稱下的 [儲存] 以儲存資料流程圖。
測試複雜的資料流程
在我們可以查看輸出之前,我們需要設定來源資料。
將原始影像檔上傳到 mqtt-client Pod
這些影像檔可供 snapshot 模組用來偵測影像中的物件。 它們位於 GitHub 上的 images 資料夾中。
首先,複製存放庫以取得對這些影像檔的存取權:
git clone https://github.com/Azure-Samples/explore-iot-operations.git
cd explore-iot-operations
若要將原始影像檔從 ./samples/wasm/images 資料夾上傳至 mqtt-client Pod,您可以使用下列命令:
kubectl cp ./samples/wasm/images azure-iot-operations/mqtt-client:/tmp
檢查檔案是否已上傳:
kubectl exec -it mqtt-client -n azure-iot-operations -- ls /tmp/images
您應該會在 /tmp/images 資料夾中看到檔案清單。
beaker.raw laptop.raw sunny2.raw
binoculars.raw lawnmower.raw sunny4.raw
broom.raw milkcan.raw thimble.raw
camera.raw photocopier.raw tripod.raw
computer_mouse.raw radiator.raw typewriter.raw
daisy3.raw screwdriver.raw vacuum_cleaner.raw
digital_clock.raw sewing_machine.raw
hammer.raw sliding_door.raw
發佈模擬的溫度、濕度資料並傳送影像
您可以將發佈溫度、濕度資料和傳送影像的命令合併到單一指令碼中。 使用下列命令:
# Connect to the MQTT client pod and run the script
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
while true; do
# Generate a random temperature value between 0 and 6000
temp_value=$(shuf -i 0-6000 -n 1)
temp_payload="{\"temperature\":{\"value\":$temp_value,\"unit\":\"F\"}}"
echo "Publishing temperature: $temp_payload"
mosquitto_pub -h aio-broker -p 18883 \
-m "$temp_payload" \
-t "sensor/temperature/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
# Generate a random humidity value between 30 and 90
humidity_value=$(shuf -i 30-90 -n 1)
humidity_payload="{\"humidity\":{\"value\":$humidity_value}}"
echo "Publishing humidity: $humidity_payload"
mosquitto_pub -h aio-broker -p 18883 \
-m "$humidity_payload" \
-t "sensor/humidity/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
# Send an image every 2 seconds
if [ $(( $(date +%s) % 2 )) -eq 0 ]; then
file=$(ls /tmp/images/*.raw | shuf -n 1)
echo "Sending file: $file"
mosquitto_pub -h aio-broker -p 18883 \
-f $file \
-t "sensor/images/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
fi
# Wait for 1 second before the next iteration
sleep 1
done'
查看輸出
在新的終端機中,訂閱輸出主題:
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "analytics/sensor/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'
輸出應該會顯示如下
{"temperature":[{"count":9,"max":2984.4444444444443,"min":248.33333333333337,"average":1849.6296296296296,"last":2612.222222222222,"unit":"C","overtemp":true}],"humidity":[{"count":10,"max":76.0,"min":30.0,"average":49.7,"last":38.0}],"object":[{"result":"milk can; broom; screwdriver; binoculars, field glasses, opera glasses; toy terrier"}]}
{"temperature":[{"count":10,"max":2490.5555555555557,"min":430.55555555555554,"average":1442.6666666666667,"last":1270.5555555555557,"unit":"C","overtemp":true}],"humidity":[{"count":9,"max":87.0,"min":34.0,"average":57.666666666666664,"last":42.0}],"object":[{"result":"broom; Saint Bernard, St Bernard; radiator"}]}
在這裡,該輸出包含了溫度和濕度資料,以及影像中偵測到的物件。
更新運行中的圖中的模組
你可以在不停止圖形的情況下更新 WASM 模組。 當你想更新運算子的邏輯而不停止資料流時,這很有用。 例如,要將溫度轉換模組從版本 1.0.0 更新到 2.0.0,請上傳新版本如下:
oras push <YOUR_ACR_NAME>.azurecr.io/temperature:2.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm temperature-2.0.0.wasm:application/wasm
資料流程圖會自動接收模組的新版本,無需額外設定。 圖可持續不中斷運行,並使用新模組版本進行後續資料處理。
開發自訂 WASM 模組
若要為您的資料流程圖建立自訂資料處理邏輯,請在 Rust 或 Python 中開發 WebAssembly 模組。 自訂模組可讓您實作內建運算子中無法使用的特殊化商務規則、資料轉換和分析。
如需完整的開發指引,包括:
- 設定開發環境
- 在 Rust 和 Python 中建立運算子
- 了解資料模型和介面
- 建置和測試模組
如需建立和設定定義數據處理工作流程之 YAML 圖形定義的詳細資訊,請參閱 設定 WebAssembly 圖形定義。
自訂資料流程圖的設定
本節提供使用 WASM 模組設定資料流程圖的詳細資訊。 其涵蓋所有設定選項、資料流程端點和進階設定。
資料流程圖概觀
資料流程圖會定義資料如何流經 WebAssembly 模組進行處理。 每個圖表都包含:
- 控制圖形是否啟用或停用的模式
- 連結至資料流程設定檔的設定檔參考,而此設定檔會定義縮放和資源設定
- 可選地為圖狀態啟用持久存儲的磁碟持續性
- 定義來源、處理和目的地元件的節點
- 指定節點之間數據流方式的節點連線
模式設定
模式屬性會決定資料流程圖是否正在主動處理資料。 您可以將模式設定為 Enabled 或 Disabled (不區分大小寫)。 停用時,圖表會停止處理資料,但會保留其設定。
建立或編輯資料流程圖表時,在 資料流程屬性 窗格中,您可以核取 啟用資料流程 為是 ,將 模式設定為 Enabled。 如果您未勾選它,則模式會設定為 Disabled。
設定檔參考
設定檔參考會將您的資料流程圖連線到資料流程設定檔,此設定檔會定義縮放設定、執行個體計數和資源限制。 如果您未指定設定檔參考,則必須改用 Kubernetes 擁有者參考。 大部分案例都會使用 Azure IoT 操作所提供的預設設定檔。
建立或編輯資料流程圖表時,請在 [資料流程屬性 ] 窗格中,選取資料流程設定檔。 預設數據流配置檔會自動被選取。 如需數據流配置檔的詳細資訊,請參閱 設定數據流配置檔。
這很重要
您只能在建立資料流程圖時選擇資料流設定檔。 建立資料流程圖表之後,您無法變更資料流程設定檔。 如果您想要變更現有資料流程圖的資料流程設定檔,請刪除原始資料流程圖,並使用新的資料流程設定檔建立新的資料流程圖。
要求磁碟持續性
這很重要
資料流程圖的磁碟持續性有已知問題。 此功能目前無法如預期般運作。 如需詳細資訊,請參閱
要求磁碟持續性可讓資料流程圖在重新啟動時維持狀態。 當您啟用這項功能時,如果連線的代理程式重新啟動,圖表就可以復原處理狀態。 這項功能對於遺失中繼資料帶來問題的具狀態處理案例很有用。 當您啟用要求磁碟持續性時,代理程式會將 MQTT 資料 (例如訂閱者佇列中的訊息) 保存到磁碟。 此方法可確保資料流程的資料來源不會在斷電或代理程式重新啟動期間發生資料遺失。 代理程式會維護最佳效能,因為持續性是根據資料流程設定的,因此只有需要持續性的資料流程才會使用這項功能。
資料流程圖會在訂閱期間使用 MQTTv5 使用者屬性提出此持續性要求。 這項功能只有在下列情況下才會運作:
- 資料流程使用 MQTT 代理程式作為來源 (具有 MQTT 端點的來源節點)
- MQTT 代理程式已啟用持續性,而動態持續性模式已針對資料類型設定為
Enabled,例如訂閱者佇列
此設定可讓資料流程圖之類的 MQTT 用戶端,使用 MQTTv5 使用者屬性為其訂用帳戶要求磁碟持續性。 如需詳細的 MQTT 代理程式持續性設定,請參閱設定 MQTT 代理程式持續性。
此設定接受 Enabled 或 Disabled,而 Disabled 做為預設值。
建立或編輯資料流程圖表時,在 [資料流程屬性 ] 窗格中,您可以核取 [要求資料持續性] 為 [ 是 ],將要求磁碟持續性設定為 Enabled。 如果未勾選,則設定為 Disabled。
節點組態
節點是資料流程圖的建置組塊。 每個節點在圖表內都有唯一的名稱,並執行特定函式。 有三種類型的節點:
來源節點
來源節點定義資料進入圖表的位置。 它們連線到從 MQTT 代理程式或 Kafka 主題接收資料的資料流程端點。 每個來源節點都必須指定:
- 指向已設定資料流端點的端點引用
- 資料來源做為要訂閱的 MQTT 主題或 Kafka 主題清單
- 連結至 Azure Device Registry 資產以進行架構推斷的資產參考 (選擇性)
資料來源陣列可讓您訂閱多個主題,而不需要修改端點設定。 這種彈性可讓端點在不同資料流程之間重複使用。
備註
目前,僅支援 MQTT 和 Kafka 端點作為資料流程圖的資料來源。 如需詳細資訊,請參閱 設定資料流程端點。
在資料流程圖中,選取 [ 來源 ] 以設定來源節點。 在 [來源詳細資料] 底下,選取 [資料流程端點],然後使用 [主題] 欄位來指定要訂閱傳入訊息的 MQTT 主題篩選器。 您可以選取 [新增資料列] 並輸入新主題,以新增多個 MQTT 主題。
圖表處理節點
圖表處理節點包含轉換資料的 WebAssembly 模組。 這些節點會從容器登錄中提取 WASM 成品,並使用指定的設定參數執行這些成品。 每個圖表節點都需要:
- 登錄端點參考,指向用於提取成品的登錄端點
- 成品規格,定義要提取的的模組名稱和版本
- 作為鍵值對傳遞給 WASM 模組的配置參數
設定陣列可讓您自訂模組行為,而不需要重建 WASM 成品。 常見的設定選項包括處理參數、閾值、轉換設定和功能旗標。
在資料流程圖中,選取 [ 新增圖形轉換 (選擇性) ] 以新增圖形處理節點。 在 圖表選取 窗格中,選取所需的圖形構件 (簡單或複雜圖表),然後按一下 套用。 您可以選取圖表中的圖形節點,以設定某些圖形運算子設定。
設定機碼值組會在執行階段傳遞至 WASM 模組。 模組可以存取這些值來自訂其行為。 此方法可讓您:
- 使用不同的設定來部署相同的 WASM 模組
- 在不重建模組的情況下調整處理參數
- 根據部署需求啟用或停用功能
- 設定環境特定值,例如閾值或端點
目的地節點
目的地節點會定義將已處理資料傳送至其中的位置。 它們會連線到將資料傳送至 MQTT 代理程式、雲端儲存體或其他系統的資料流程端點。 每個目的地節點都會指定:
- 指向已設定資料流端點的端點引用
- 數據目的地作為輸出數據的特定主題、路徑或位置
- 定義序列化格式和架構驗證的輸出架構設定(選擇性)
針對 Azure Data Lake 或 Fabric OneLake 等儲存體目的地,您可以指定輸出結構描述設定,來控制資料的序列化和驗證方式。
備註
目前,僅支援 MQTT、Kafka 和 OpenTelemetry 端點作為資料流程圖的資料目的地。 如需詳細資訊,請參閱 設定資料流程端點。
- 在資料流程圖中,選取 [目的地] 節點。
- 從 [資料流程端點詳細資料 ] 下拉式清單中選取所需的資料流程端點。
- 選取 [繼續] 以設定目的地。
- 輸入目的地的 必要設定 ,包括要傳送資料的主題或表格。 資料目的地欄位會根據端點型別自動解譯。 例如,如果資料流程端點是 MQTT 端點,則目的地詳細資料頁面會提示您輸入主題。
節點連線
節點連線會定義節點之間的資料流程路徑。 每個連線都會指定來源節點和目的地節點,從而建立處理管線。 連線可以選擇性地包含結構描述驗證,以確保處理階段之間的資料完整性。
當您指定結構描述驗證時,系統會在資料於節點之間流動時驗證其格式和結構。 驗證有助於及早發現資料不一致,並確保 WASM 模組以預期格式接收資料。
當您選取圖形處理節點時,作業體驗會自動建立節點連線。 建立圖表之後,您無法修改連線。
資料流程端點
資料流程圖會透過資料流程端點連線到外部系統。 端點的類型會決定其是否可以當做來源、目的地或兩者使用:
MQTT 端點
MQTT 端點可以同時充當來源和目的地。 它們會連線到 MQTT 代理程式,包括:
- Azure IoT 操作本機 MQTT 代理程式 (每個資料流程中都需要)
- Azure 事件方格 MQTT
- 自訂 MQTT 代理程式
如需詳細的設定資訊,請參閱設定 MQTT 資料流程端點。
Kafka 端點
Kafka 端點可以同時充當來源和目的地。 它們會連線到與 Kafka 相容的系統,包括:
- Azure 事件中樞 (與 Kafka 相容)
- Apache Kafka 叢集
- 匯流雲
如需詳細的設定資訊,請參閱設定 Azure 事件中樞和 Kafka 資料流程端點。
儲存體端點
儲存體端點只能充當目的地。 它們會連線到雲端儲存體系統,以進行長期資料保留和分析:
- Azure Data Lake 儲存體
- Microsoft Fabric OneLake
- 本機記憶體
儲存體端點通常需要輸出結構描述設定,來定義資料序列化格式。
登錄端點
登錄端點提供容器登錄的存取權,以提取 WASM 模組和圖表定義。 它們不會直接在資料流程中使用,但圖表處理節點會參考它們。
如需詳細的設定資訊,請參閱設定登錄端點。