重要事項
此頁面包含使用 Kubernetes 部署資訊清單來管理 Azure IoT 操作元件的指示,其處於預覽狀態。 此功能以數項限制提供,不應用於生產工作負載。
請參閱 Microsoft Azure 預覽版增補使用規定,以了解適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的法律條款。
若要設定 Azure IoT 操作與 Apache Kafka 代理程式之間的雙向通訊,您可以設定資料流程端點。 此組態可讓您指定端點、傳輸層安全性 (TLS)、驗證和其他設定。
先決條件
Azure 事件中樞
Azure 事件中樞與 Kafka 通訊協定相容,而且可與資料流程搭配使用,但有一些限制。
建立 Azure 事件中樞命名空間和事件中樞
首先,建立已啟用 Kafka 的 Azure 事件中樞命名空間
接下來,在命名空間中建立事件中樞。 每個個別的事件中樞都會對應至 Kafka 主題。 您可以在相同的命名空間中建立多個事件中樞來代表多個 Kafka 主題。
將權限指派給受控識別
若要設定 Azure 事件中樞的資料流程端點,建議您使用使用者指派或系統指派的受控識別。 這種方法是安全的,而且不需要手動管理認證。
建立 Azure 事件中樞命名空間和事件中樞之後,您必須將角色指派給 Azure IoT Operations 受控識別,以授與傳送或接收訊息給事件中樞的許可權。
如果使用系統指派的受控識別,請在 Azure 入口網站中移至您的 Azure IoT 操作執行個體,然後選取 [概觀]。 複製 Azure IoT 操作Arc 延伸模組之後所列的延伸模組名稱。 例如,azure-iot-operations-xxxx7。 使用 Azure IoT 操作Arc 延伸模組的相同名稱,即可找到系統指派的受控識別。
然後,移至 [事件中樞] 命名空間 > [存取控制 (IAM)]>[新增角色指派]。
- 在 [ 角色] 索引標籤上,選取適當的角色,例如
Azure Event Hubs Data Sender 或 Azure Event Hubs Data Receiver。 這會為受控識別提供傳送或接收命名空間中所有事件中樞訊息所需的權限。 若要深入了解,請參閱使用 Microsoft Entra ID 驗證應用程式以存取事件中樞資源。
- 在 [成員] 索引標籤上:
- 如果使用系統指派的受控識別,對於 [存取權指派對象為],請選取 [使用者、群組或服務主體] 選項,然後選取 [+ 選取成員] 並搜尋 Azure IoT 操作 Arc 延伸模組的名稱。
- 如果使用使用者指派的受控識別,對於 [存取權指派對象為],請選取 [受控識別] 選項,然後選取 [+ 選取成員] 並搜尋針對雲端連線設定的使用者指派的受控識別。
建立 Azure 事件中樞的資料流程端點
設定 Azure 事件中樞命名空間和事件中樞之後,您可以為已啟用 Kafka 的 Azure 事件中樞命名空間建立數據流端點。
在 [作業體驗] 中,選取 [資料流程端點] 索引標籤。
在 [建立新的資料流程端點] 下,選取 [Azure 事件中樞]>[新增]。
輸入端點的下列設定:
| 設定 |
描述 |
| 名稱 |
資料流端點的名稱。 |
| 主辦人 |
事件中樞主機的主機名稱。 您可以搜尋現有的事件中樞主機,或使用 格式 <NAMESPACE>.servicebus.windows.net手動輸入主機名。 |
| 港口 |
事件中樞主機的連接埠。 對於事件中樞,連接埠為 9093。 |
| 驗證方法 |
用於驗證的方法。 我們建議您選擇系統指派的受控識別或使用者指派的受控識別。 |
選取 [套用] 以佈建端點。
建立或取代
使用 az iot ops dataflow endpoint create eventgrid 命令來建立或取代 Azure 事件中樞數據流端點。
az iot ops dataflow endpoint create eventhub --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --eventhub-namespace <EventHubsNamespace>
此命令會建立具有預設設定的 Azure 事件中樞端點。 您可以視需要指定其他選項。
以下是建立或取代名為 event-hub-endpoint的 Azure 事件中樞數據流端點的範例命令:
az iot ops dataflow endpoint create eventhub --resource-group myResourceGroup --instance myAioInstance --name event-hub-endpoint --eventhub-namespace myeventhubnamespace
建立或變更
使用 az iot ops dataflow endpoint apply 命令來建立或變更 Azure 事件中樞數據流端點。
az iot ops dataflow endpoint apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --config-file <ConfigFilePathAndName>
參數 --config-file 是 JSON 組態檔的路徑和檔名,其中包含資源屬性。
在此範例中,假設名為 event-hubs-endpoint.json 的組態檔,其中包含儲存在使用者主目錄中的下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"host": "myeventhubnamespace.servicebus.windows.net:9093",
"authentication": {
"method": "SystemAssignedManagedIdentity",
"systemAssignedManagedIdentitySettings": {}
},
"tls": {
"mode": "Enabled"
}
}
}
以下是建立或更新名為 event-hubs-endpoint的 Azure 事件中樞數據流端點的範例命令:
az iot ops dataflow endpoint apply --resource-group myResourceGroup --instance myAioInstance --name event-hubs-endpoint --config-file ~/event-hubs-endpoint.json
使用下列內容建立 Bicep .bicep 檔案。
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param endpointName string = '<ENDPOINT_NAME>'
param hostName string = '<NAMESPACE>.servicebus.windows.net:9093'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource kafkaEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' = {
parent: aioInstance
name: endpointName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
endpointType: 'Kafka'
kafkaSettings: {
host: hostName
authentication: {
// See available authentication methods section for method types
// method: <METHOD_TYPE>
}
tls: {
mode: 'Enabled'
}
}
}
}
然後,透過 Azure CLI 進行部署。
az deployment group create --resource-group <RESOURCE_GROUP> --template-file <FILE>.bicep
使用下列內容建立 Kubernetes 資訊清單 .yaml 檔案。
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowEndpoint
metadata:
name: <ENDPOINT_NAME>
namespace: azure-iot-operations
spec:
endpointType: Kafka
kafkaSettings:
host: <NAMESPACE>.servicebus.windows.net:9093
authentication:
# See available authentication methods section for method types
# method: <METHOD_TYPE>
tls:
mode: Enabled
然後將資訊清單檔套用至 Kubernetes 叢集。
kubectl apply -f <FILE>.yaml
附註
當您建立資料流程時,稍後會設定 Kafka 主題或個別的事件中樞。 Kafka 主題是資料流程訊息的目的地。
使用連接字串來對事件中樞進行驗證
重要事項
若要使用作業體驗 Web UI 來管理秘密,必須先設定 Azure Key Vault 並啟用工作負載身分識別,以使用安全設定來啟用 Azure IoT 作業。 若要深入了解,請參閱在 Azure IoT 操作部署中啟用安全設定。
在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[SASL]。
輸入端點的下列設定:
| 設定 |
描述 |
| SASL 類型 |
選擇 Plain。 |
| 已同步的祕密名稱 |
輸入包含連接字串的 Kubernetes 秘密名稱。 |
| 使用者名稱參考或權杖密碼 |
用於 SASL 驗證的使用者名稱參考或權杖密碼。 請從 Key Vault 清單中挑選它或建立一個新的。 值必須是 $ConnectionString。 |
| 權杖密碼的密碼參考 |
用於 SASL 驗證的密碼參考或權杖密碼。 請從 Key Vault 清單中挑選它或建立一個新的。 值的格式必須是 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>。 |
選取 [新增參考] 之後,如果您選取 [建立新的],請輸入下列設定:
| 設定 |
描述 |
| 祕密名稱 |
Azure Key Vault 中的祕密名稱。 挑選容易記住的名稱,以便稍後從清單中選取祕密。 |
| 祕密值 |
對於使用者名稱,請輸入 $ConnectionString。 對於密碼,請輸入格式為 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> 的連接字串。 |
| 設定啟用日期 |
若已開啟,則為祕密變成作用中的日期。 |
| 設定到期日期 |
如果已開啟,則為祕密到期的日期。 |
若要深入了解秘密,請參閱在 Azure IoT 操作中建立和管理秘密。
建立或取代
使用 az iot ops dataflow endpoint create eventgrid 命令來建立或取代 Azure 事件中樞數據流端點。
az iot ops dataflow endpoint create eventhub --auth-type Sasl --sasl-type <SaslType> --secret-name <SecretName> --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --eventhub-namespace <EventHubsNamespace>
此命令會建立具有預設設定的 Azure 事件中樞端點。 您可以視需要指定其他選項。
以下是建立或取代名為 event-hub-endpoint的 Azure 事件中樞數據流端點的範例命令:
az iot ops dataflow endpoint create eventhub --resource-group myResourceGroup --instance myAioInstance --name event-hub-endpoint --eventhub-namespace myeventhubnamespace
建立或變更
使用 az iot ops dataflow endpoint apply 命令來建立或變更 Azure 事件中樞數據流端點。
az iot ops dataflow endpoint apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --config-file <ConfigFilePathAndName>
參數 --config-file 是 JSON 組態檔的路徑和檔名,其中包含資源屬性。
在此範例中,假設名為 event-hubs-endpoint.json 的組態檔,其中包含儲存在使用者主目錄中的下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"host": "myeventhubnamespace.servicebus.windows.net:9093",
"authentication": {
"method": "SystemAssignedManagedIdentity",
"systemAssignedManagedIdentitySettings": {}
},
"tls": {
"mode": "Enabled"
}
}
}
以下是建立或更新名為 event-hubs-endpoint的 Azure 事件中樞數據流端點的範例命令:
az iot ops dataflow endpoint apply --resource-group myResourceGroup --instance myAioInstance --name event-hubs-endpoint --config-file ~/event-hubs-endpoint.json
kafkaSettings: {
authentication: {
method: 'Sasl'
saslSettings: {
saslType: 'Plain'
secretRef: '<SECRET_NAME>'
}
}
tls: {
mode: 'Enabled'
}
}
若要使用連接字串來對事件中樞進行驗證,請使用 SASL 驗證方法並將 SASL 類型設定為 "Plain",然後設定包含連接字串的秘密名稱。
首先,建立包含連接字串的 Kubernetes 秘密。 祕密必須與 Kafka 資料流程端點位於同一個命名空間中。 祕密必須具有使用者名稱和密碼作為索引鍵/值組。 例如:
kubectl create secret generic <SECRET_NAME> -n azure-iot-operations \
--from-literal=username='$ConnectionString' \
--from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>'
秘訣
將連接字串的範圍設定到命名空間 (而不是個別的事件中樞),可讓資料流程從多個不同的事件中樞和 Kafka 主題中傳送和接收訊息。
kafkaSettings:
authentication:
method: Sasl
saslSettings:
saslType: Plain
secretRef: <SECRET_NAME>
tls:
mode: Enabled
限制
Azure 事件中樞不支援 Kafka 支援的所有壓縮類型。 Azure 事件中樞進階層和專用層中目前僅支援 GZIP 壓縮。 使用其他壓縮類型可能會導致錯誤。
自訂 Kafka 代理程式
若要為非事件中樞 Kafka 代理程式設定資料流程端點,請根據需要設定主機、TLS、驗證和其他設定。
在 [作業體驗] 中,選取 [資料流程端點] 索引標籤。
在 [建立新的資料流程端點] 下,選取 [自訂 Kafka 代理程式]>[新增]。
輸入端點的下列設定:
| 設定 |
描述 |
| 名稱 |
資料流端點的名稱。 |
| 主辦人 |
格式為 <Kafka-broker-host>:xxxx 的 Kafka 代理程式的主機名稱。 在主機設定中包含連接埠號碼。 |
| 驗證方法 |
用於驗證的方法。 選擇 SASL。 |
| SASL 類型 |
SASL 驗證的類型。 選擇 Plain、ScramSha256 或 ScramSha512。 如果使用 SASL,則為必要項。 |
| 已同步的祕密名稱 |
秘密的名稱。 如果使用 SASL,則為必要項。 |
| 權杖密碼的使用者名稱參考 |
SASL 權杖密碼中使用者名稱的參考。 如果使用 SASL,則為必要項。 |
選取 [套用] 以佈建端點。
附註
目前,作業體驗不支援使用 Kafka 資料流程端點作為來源。 您可以使用 Kubernetes 或 Bicep 來建立具有來源 Kafka 資料流程端點的資料流程。
建立或取代
使用 az iot ops dataflow endpoint create custom-kafka 命令來建立或取代自定義 Kafka 訊息代理程序數據流端點。
az iot ops dataflow endpoint create custom-kafka --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --hostname <KafkaBrokerHostName> --port <Port>
此命令會建立具有預設設定的自定義 Kafka 訊息代理程式端點。 您可以視需要指定其他選項。
以下是建立或取代名為 custom-kafka-endpoint的自訂 Kafka 訊息代理程式資料流端點的範例命令:
az iot ops dataflow endpoint create custom-kafka --resource-group myResourceGroup --instance myAioInstance --name custom-kafka-endpoint --hostname mykafkabroker --port 9092
建立或變更
使用 az iot ops dataflow endpoint apply 命令來建立或變更自定義 Kafka 訊息代理程式數據流端點。
az iot ops dataflow endpoint apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName> --config-file <ConfigFilePathAndName>
參數 --config-file 是 JSON 組態檔的路徑和檔名,其中包含資源屬性。
在此範例中,假設名為 custom-kafka-endpoint.json 的組態檔,其中包含儲存在使用者主目錄中的下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"host": "mykafkabroker:9092",
"authentication": {
"method": "SystemAssignedManagedIdentity",
"systemAssignedManagedIdentitySettings": {}
},
"tls": {
"mode": "Enabled"
}
}
}
以下是建立或更新名為 custom-kafka-endpoint之自定義 Kafka 訊息代理程式資料流端點的範例命令:
az iot ops dataflow endpoint apply --resource-group myResourceGroup --instance myAioInstance --name custom-kafka-endpoint --config-file ~/custom-kafka-endpoint.json
resource kafkaEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' = {
parent: aioInstance
name: '<ENDPOINT NAME>'
extendedLocation: {
name: '<CUSTOM_LOCATION_NAME>'
type: 'CustomLocation'
}
properties: {
endpointType: 'Kafka'
host: '<KAFKA-HOST>:<PORT>'
kafkaSettings: {
authentication: {
method: 'Sasl'
saslSettings: {
saslType: '<TYPE>'
secretRef: '<SECRET_NAME>'
}
}
tls: {
mode: 'Enabled'
}
}
}
}
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowEndpoint
metadata:
name: <ENDPOINT NAME>
namespace: azure-iot-operations
spec:
endpointType: Kafka
kafkaSettings:
host: <KAFKA-HOST>:<PORT>
authentication:
method: Sasl
saslSettings:
saslType: <TYPE>
secretRef: <SECRET_NAME>
tls:
mode: Enabled
若要自訂端點設定,請使用下列各節來取得詳細資訊。
可用的驗證方法
下列驗證方法可用於 Kafka 代理程式資料流程端點。
系統指派的受控識別
設定資料流端點之前,請先將角色指派給 Azure IoT 操作受控識別,其會授與連線至 Kafka 代理程式的權限:
- 在 Azure 入口網站中,移至您的 Azure IoT 操作執行個體,然後選取 [概觀]。
- 複製 Azure IoT 操作Arc 延伸模組之後所列的延伸模組名稱。 例如,azure-iot-operations-xxxx7。
- 移至您需要授與權限的雲端資源。 例如,移至 [事件中樞] 命名空間 > [存取控制 (IAM)]>[新增角色指派]。
- 在 [ 角色] 索引標籤上,選取適當的角色。
- 在 [成員] 索引標籤上,對於 [存取權指派對象為],請選取 [使用者、群組或服務主體] 選項,然後選取 [+ 選取成員] 並搜尋 Azure IoT 操作受控識別。 例如,azure-iot-operations-xxxx7。
然後,使用系統指派的受控識別設定來設定資料流端點。
在 [作業體驗資料流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[系統指派的受控識別]。
建立或取代
使用 az iot ops dataflow endpoint create custom-kafka 命令,並將 --auth-type 參數設定為 SystemAssignedManagedIdentity 進行系統指派的受控識別驗證。
az iot ops dataflow endpoint create <Command> --auth-type SystemAssignedManagedIdentity --audience <Audience> --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName>
建立或變更
使用 az iot ops dataflow endpoint apply 命令並指定 --config-file 參數。
在此範例中,假設組態檔具有下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"host": "mykafkabroker:9092",
"authentication": {
"method": "SystemAssignedManagedIdentity",
"systemAssignedManagedIdentitySettings": {}
},
"tls": {
"mode": "Enabled"
}
}
}
kafkaSettings: {
authentication: {
method: 'SystemAssignedManagedIdentity'
systemAssignedManagedIdentitySettings: {}
}
}
kafkaSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
{}
此組態會建立具有預設對象的受控識別,該對象與 https://<NAMESPACE>.servicebus.windows.net 格式的事件中樞命名空間主機值相同。 不過,如果您需要覆寫預設對象,您可以將 audience 欄位設定為所需的值。
{
"endpointType": "Kafka",
"kafkaSettings": {
"authentication": {
"method": "SystemAssignedManagedIdentity",
"systemAssignedManagedIdentitySettings": {
"audience": "audience"
}
}
}
}
kafkaSettings: {
authentication: {
method: 'SystemAssignedManagedIdentity'
systemAssignedManagedIdentitySettings: {
audience: '<YOUR_AUDIENCE_OVERRIDE_VALUE>'
}
}
}
kafkaSettings:
authentication:
method: SystemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
audience: <YOUR_AUDIENCE_OVERRIDE_VALUE>
使用者指派的受控識別
若要將使用者指派的受控識別用於驗證,您必須先部署已啟用安全設定的 Azure IoT 操作。 然後,您必須為雲端連線設定使用者指派的受控識別。 若要深入了解,請參閱在 Azure IoT 操作部署中啟用安全設定。
在設定資料流程端點之前,請為使用者指派的受控識別指派一個角色,以授與連接到 Kafka 代理程式的權限:
- 在 Azure 入口網站中,移至您需要授與權限的雲端資源。 例如,移至 [事件方格] 命名空間 > [存取控制 (IAM)]>[新增角色指派]。
- 在 [ 角色] 索引標籤上,選取適當的角色。
- 在 [成員] 索引標籤上,對於 [存取權指派對象為],請選取 [受控識別] 選項,然後選取 [+ 選取成員] 並搜尋使用者指派的受控識別。
然後,使用使用者指派的受控識別設定來設定資料流端點。
在 [作業體驗資料流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[使用者指派的受控識別]。
建立或取代
使用 az iot ops dataflow endpoint create 命令,並將 --auth-type 參數設定 UserAssignedManagedIdentity 為 ,以搭配使用者指派的受控識別驗證。
az iot ops dataflow endpoint create <Command> --auth-type UserAssignedManagedIdentity --client-id <ClientId> --tenant-id <TenantId> --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName>
建立或變更
使用 az iot ops dataflow endpoint apply 搭配 --config-file 參數
在此範例中,假設組態檔具有下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"authentication": {
"method": "UserAssignedManagedIdentity",
"userAssignedManagedIdentitySettings": {
"clientId": "<ID>",
"tenantId": "<ID>",
// Optional
"scope": "https://<Scope_Url>"
}
}
}
}
kafkaSettings: {
authentication: {
method: 'UserAssignedManagedIdentity'
UserAssignedManagedIdentitySettings: {
clientId: '<CLIENT_ID>'
tenantId: '<TENANT_ID>'
// Optional, defaults to https://<NAMESPACE>.servicebus.windows.net/.default
// Matching the Event Hubs namespace you configured as host
// scope: 'https://<SCOPE_URL>'
}
}
...
}
kafkaSettings:
authentication:
method: UserAssignedManagedIdentity
userAssignedManagedIdentitySettings:
clientId: <CLIENT_ID>
tenantId: <TENANT_ID>
# Optional, defaults to https://<NAMESPACE>.servicebus.windows.net/.default
# Matching the Event Hubs namespace you configured as host
# scope: https://<SCOPE_URL>
在這裡,範圍是受控識別的對象。 預設值與事件中樞命名空間主機值相同,格式為 https://<NAMESPACE>.servicebus.windows.net。 不過,如果您需要覆寫預設對象,您可以使用 Bicep 或 Kubernetes,將範圍欄位設定為所需的值。
SASL
若要使用 SASL 進行驗證,請指定 SASL 驗證方法,並設定 SASL 類型和包含 SASL 權杖的祕密名稱的秘密參考。
在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[SASL]。
輸入端點的下列設定:
| 設定 |
描述 |
| SASL 類型 |
要使用的 SASL 驗證類型。 支援的類型為 Plain、ScramSha256 和 ScramSha512。 |
| 已同步的祕密名稱 |
包含 SASL 權杖的 Kubernetes 秘密名稱。 |
| 使用者名稱參考或權杖密碼 |
用於 SASL 驗證的使用者名稱參考或權杖密碼。 |
| 權杖密碼的密碼參考 |
用於 SASL 驗證的密碼參考或權杖密碼。 |
建立或取代
使用 az iot ops dataflow endpoint create 命令,並將 --auth-type 參數設定 Sasl 為 進行 SASL 驗證。
az iot ops dataflow endpoint create <Command> --auth-type Sasl --sasl-type <SaslType> --secret-name <SecretName> --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName>
建立或變更
使用 az iot ops dataflow endpoint apply 搭配 --config-file 參數
在此範例中,假設組態檔具有下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"authentication": {
"method": "Sasl",
"saslSettings": {
"saslType": "<SaslType>",
"secretRef": "<SecretName>"
}
}
}
}
kafkaSettings: {
authentication: {
method: 'Sasl' // Or ScramSha256, ScramSha512
saslSettings: {
saslType: 'Plain' // Or ScramSha256, ScramSha512
secretRef: '<SECRET_NAME>'
}
}
}
kubectl create secret generic sasl-secret -n azure-iot-operations \
--from-literal=token='<YOUR_SASL_TOKEN>'
kafkaSettings:
authentication:
method: Sasl
saslSettings:
saslType: Plain # Or ScramSha256, ScramSha512
secretRef: <SECRET_NAME>
支援的 SASL 類型如下:
Plain
ScramSha256
ScramSha512
祕密必須與 Kafka 資料流程端點位於同一個命名空間中。 秘密必須具有 SASL 權杖作為索引鍵/值組。
匿名
若要使用匿名驗證,請更新 Kafka 設定的驗證區段以使用匿名方法。
在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[無]。
建立或取代
使用 az iot ops dataflow endpoint create 命令,其中包含可進行匿名驗證的 --no-auth 參數。
az iot ops dataflow endpoint create <Command> --no-auth --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <EndpointName>
建立或變更
使用 az iot ops dataflow endpoint apply 搭配 --config-file 參數。
在此範例中,假設組態檔具有下列內容:
{
"endpointType": "Kafka",
"kafkaSettings": {
"authentication": {
"method": "Anonymous"
}
}
}
kafkaSettings: {
authentication: {
method: 'Anonymous'
}
}
kafkaSettings:
authentication:
method: Anonymous
anonymousSettings:
{}
進階設定
您可以設定 Kafka 資料流程端點的進階設定,例如 TLS、受信任的 CA 憑證、Kafka 傳訊設定、批次處理和 CloudEvents。 您可以在資料流程端點 [進階] 入口網站索引標籤中或資料流程端點資源內設定這些設定。
在作業體驗中,選取資料流端點的 [進階] 索引標籤。
{
"endpointType": "Kafka",
"kafkaSettings": {
"batching": {
"latencyMs": 10,
"maxBytes": 200000,
"maxMessages": 20000,
"mode": "Enabled"
},
"cloudEventAttributes": "CreateOrRemap",
"compression": "Gzip",
"consumerGroupId": "<ID>",
"copyMqttProperties": "Enabled",
"host": "mykafkabroker:9092",
"kafkaAcks": "Zero",
"partitionStrategy": "Static",
"tls": {
"mode": "Enabled",
"trustedCaCertificateConfigMapRef": "<YOUR_CA_CERTIFICATE>"
}
}
}
在 kafkaSettings 下,您可以設定 Kafka 端點的其他設定。
// See sections below for more details
kafkaSettings: {
consumerGroupId: '<ID>'
compression: 'Gzip'
copyMqttProperties: 'Disabled'
kafkaAcks: 'All'
partitionStrategy: 'Default'
tls: {
mode: 'Enabled'
trustedCaCertificateConfigMapRef: '<YOUR_CA_CERTIFICATE>'
}
batching: {
mode: 'Enabled'
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
}
}
在 kafkaSettings 下,您可以設定 Kafka 端點的其他設定。
# See sections below for more details
kafkaSettings:
consumerGroupId: <ID>
compression: Gzip
copyMqttProperties: Disabled
kafkaAcks: All
partitionStrategy: Default
tls:
mode: Enabled
trustedCaCertificateConfigMapRef: <YOUR_CA_CERTIFICATE>
batching:
mode: Enabled
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
TLS 設定
TLS 模式
若要啟用或停用 Kafka 端點的 TLS,請更新 TLS 設定中的 mode 設定。
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [啟用 TLS 模式] 旁的核取方塊。
{
"endpointType": "Kafka",
"kafkaSettings": {
"tls": {
"mode": "Enabled",
}
}
}
kafkaSettings: {
tls: {
mode: 'Enabled' // Or Disabled
}
}
kafkaSettings:
tls:
mode: Enabled # Or Disabled
TLS 模式可以設定為 Enabled 或 Disabled。 如果模式設定為 Enabled,則資料流程會使用安全的連線到 Kafka 代理程式。 如果模式設定為 Disabled,則資料流程會使用不安全的連線到 Kafka 代理程式。
受信任的 CA 憑證
為 Kafka 端點設定受信任的 CA 憑證,以建立安全的連線到 Kafka 代理程式。 如果 Kafka 代理程式使用自我簽署憑證或由預設不受信任的自訂 CA 所簽署的憑證,則此設定就很重要。
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [信任的 CA 憑證組態對應] 欄位來指定包含受信任 CA 憑證的 ConfigMap。
{
"endpointType": "Kafka",
"kafkaSettings": {
"tls": {
"mode": "Enabled",
"trustedCaCertificateConfigMapRef": "<YOUR_CA_CERTIFICATE>"
}
}
}
kafkaSettings: {
tls: {
trustedCaCertificateConfigMapRef: '<YOUR_CA_CERTIFICATE>'
}
}
kafkaSettings:
tls:
trustedCaCertificateConfigMapRef: <YOUR_CA_CERTIFICATE>
此 ConfigMap 應包含 PEM 格式的 CA 憑證。 ConfigMap 必須與 Kafka 資料流程資源位於相同的命名空間中。 例如:
kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations
秘訣
連接到 Azure 事件中樞時,不需要 CA 憑證,因為事件中樞服務會使用由預設受信任的公用 CA 所簽署的憑證。
取用者群組識別碼
取用者群組識別碼用於識別資料流程用於從 Kafka 主題讀取訊息的取用者群組。 取用者群組識別碼在 Kafka 代理程式內必須是唯一的。
重要事項
當 Kafka 端點用作來源時,需要取用者群組識別碼。 否則,資料流程無法從 Kafka 主題讀取訊息,而且您會收到錯誤「Kafka 類型來源端點必須已定義 consumerGroupId」。
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [取用者群組識別碼] 欄位來指定取用者群組識別碼。
{
"endpointType": "Kafka",
"kafkaSettings": {
"consumerGroupId": "<ID>",
}
}
kafkaSettings: {
consumerGroupId: '<ID>'
}
spec:
kafkaSettings:
consumerGroupId: <ID>
只有當端點用作來源 (即資料流程是取用者) 時,此設定才會生效。
壓縮
壓縮欄位會啟用傳送至 Kafka 主題之訊息的壓縮。 壓縮有助於降低資料傳輸所需的網路頻寬和儲存空間。 不過,壓縮也會為流程增加一些額外負荷和延遲。 下表列出支援的壓縮類型。
| 值 |
描述 |
None |
不會套用壓縮或批次處理。 如果未指定壓縮,則 None 是預設值。 |
Gzip |
會套用 GZIP 壓縮和批次處理。 GZIP 是一般用途的壓縮演算法,可提供壓縮比例與速度之間的良好平衡。
Azure 事件中樞進階層和專用層目前僅支援 GZIP 壓縮。 |
Snappy |
會套用 Snappy 壓縮和批次處理。 Snappy 是一種快速壓縮演算法,可提供中度壓縮比例和速度。 Azure 事件中樞不支援此壓縮模式。 |
Lz4 |
會套用 LZ4 壓縮和批次處理。 LZ4 是一種快速壓縮演算法,可提供低壓縮比例和高速。 Azure 事件中樞不支援此壓縮模式。 |
若要設定壓縮:
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [壓縮] 欄位來指定壓縮類型。
{
"endpointType": "Kafka",
"kafkaSettings": {
"compression": "Gzip"
}
}
kafkaSettings: {
compression: 'Gzip' // Or Snappy, Lz4
}
kafkaSettings:
compression: Gzip # Or Snappy, Lz4
只有當端點用作資料流程是生產者的目的地時,此設定才會生效。
批次處理
除了壓縮之外,您也可以在將訊息傳送至 Kafka 主題之前,先設定訊息的批次處理。 批次處理可讓您將多個訊息分組在一起,並將其壓縮為單一單位,以提升壓縮效率並降低網路負荷。
| 欄位 |
描述 |
必要 |
mode |
可以是 Enabled 或 Disabled。 預設值為 Enabled,因為 Kafka 沒有非批次處理傳訊的概念。 如果設定為 Disabled,則批次處理會被最小化,以便每次建立包含一則單一訊息的批次。 |
否 |
latencyMs |
訊息在傳送之前可以緩衝處理的最大時間間隔,以毫秒為單位。 如果達到此間隔,則所有緩衝的訊息都會以批次方式傳送,而不論其大小為何。 如果未設定,預設值為 5。 |
否 |
maxMessages |
在傳送之前可以緩衝處理的訊息數目上限。 如果達到此數字,則所有緩衝的訊息都會作為一批來傳送,無論它們有多大或緩衝多長的時間。 如果未設定,預設值為 100000。 |
否 |
maxBytes |
在傳送之前可以緩衝處理的位元組大小上限。 如果達到此大小,則所有緩衝的訊息都會作為一批來傳送,無論其數量或緩衝的時間為何。 預設值為 1000000 (1 MB)。 |
否 |
例如,如果您將 latencyMs 設為 1000、將 maxMessages 設為 100,以及將 maxBytes 設為 1024,則當緩衝區中有 100 則訊息時,或當緩衝區中有 1,024 個位元組時,或自上次傳送以來已過去 1,000 毫秒時 (以先到者為準),就會傳送訊息。
若要設定批次處理:
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [啟用批次處理] 欄位來啟用批次處理。 使用批次處理延遲、最大位元組數及訊息計數欄位來指定批次處理設定。
{
"endpointType": "Kafka",
"kafkaSettings": {
"batching": {
"latencyMs": 10,
"maxBytes": 200000,
"maxMessages": 20000,
"mode": "Enabled"
}
}
}
kafkaSettings: {
batching: {
mode: 'Enabled' // Or Disabled
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
}
}
kafkaSettings:
batching:
mode: Enabled # Or Disabled
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
只有當端點用作資料流程是生產者的目的地時,此設定才會生效。
分區處理策略
分區處理策略可控制將訊息傳送到 Kafka 主題時如何指派給 Kafka 分區。 Kafka 分區是 Kafka 主題的邏輯區段,可啟用平行處理和容錯。 Kafka 主題中的每個訊息都有一個分區和一個位移,其用來識別和排序訊息。
只有當端點用作資料流程是生產者的目的地時,此設定才會生效。
根據預設,資料流程會使用循環配置資源演算法,將訊息指派給隨機分區。 不過,您可以使用不同的策略,根據某些準則將訊息指派給分區,例如 MQTT 主題名稱或 MQTT 訊息屬性。 這可協助您達成更好的負載平衡、資料位置或訊息排序。
| 值 |
描述 |
Default |
使用循環配置資源演算法,將訊息指派給隨機分區。 如果未指定任何策略,則這是預設值。 |
Static |
將訊息指派給衍生自資料流程執行個體識別碼的固定分區編號。 這表示每個資料流程執行個體都會將訊息傳送至不同的分區。 這有助於達到更好的負載平衡和資料位置。 |
Topic |
使用資料流程來源中的 MQTT 主題名稱作為分區的索引鍵。 這表示具有相同 MQTT 主題名稱的訊息會傳送至相同的分區。 這有助於達到更好的訊息排序和資料位置。 |
Property |
使用資料流程來源中的 MQTT 訊息屬性作為資料分區的索引鍵。 在 [partitionKeyProperty] 欄位中指定屬性的名稱。 這表示具有相同屬性值的訊息會傳送至相同的分割區。 這有助於根據自訂準則達成更佳的訊息排序和資料位置。 |
例如,如果您將分區處理策略設定為 Property,並將分區索引鍵屬性設定為 device-id,則具有相同 device-id 屬性的訊息會傳送至相同的分區。
若要設定分區處理策略:
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [資料分割處理策略] 欄位來指定資料分割處理策略。 如果策略設定為 ,則使用 [分區索引鍵屬性]Property 欄位來指定用於分區的屬性。
{
"endpointType": "Kafka",
"kafkaSettings": {
"partitionStrategy": "Property", // Or Default, Topic, Property
"partitionKeyProperty": "<PROPERTY_NAME>" // Required if partitionStrategy is Property
}
}
kafkaSettings: {
partitionStrategy: 'Default' // Or Static, Topic, Property
partitionKeyProperty: '<PROPERTY_NAME>'
}
kafkaSettings:
partitionStrategy: Default # Or Static, Topic, Property
partitionKeyProperty: <PROPERTY_NAME>
Kafka 確認
Kafka 確認 (acks) 可用來控制傳送至 Kafka 主題之訊息的持久性和一致性。 當產生者將訊息傳送至 Kafka 主題時,它可以向 Kafka 代理程式要求不同層級的確認,以確保訊息已成功寫入主題並在 Kafka 叢集中複寫。
只有當端點用作目的地 (即資料流程為生產者) 時,此設定才會生效。
| 值 |
描述 |
None |
資料流程不會等待來自 Kafka 代理程式的任何確認。 此設定是速度最快但最不持久的選項。 |
All |
資料流程會等待訊息寫入領導者分區和所有追隨者分區。 此設定是最慢但最持久的選項。 此設定也是預設選項 |
One |
資料流程會等待訊息寫入領導者分區和至少一個追隨者分區。 |
Zero |
資料流程會等待訊息寫入領導者分區,但不會等待來自追隨者的任何確認。 這比 One 更快,但較不持久。 |
例如,如果您將 Kafka 確認設定為 All,則資料流程會等待訊息寫入領導者分區和所有追隨者分區後再傳送下一則訊息。
若要設定 Kafka 確認:
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [Kafka 確認] 欄位來指定 Kafka 確認層級。
{
"endpointType": "Kafka",
"kafkaSettings": {
"kafkaAcks": "All" // Or None, One, Zero
}
}
kafkaSettings: {
kafkaAcks: 'All' // Or None, One, Zero
}
kafkaSettings:
kafkaAcks: All # Or None, One, Zero
只有當端點用作資料流程是生產者的目的地時,此設定才會生效。
複製 MQTT 屬性
根據預設,會啟用複製 MQTT 屬性設定。 這些使用者屬性包括 subject 之類的值,可儲存傳送訊息的資產名稱。
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [複製 MQTT 屬性] 欄位旁的核取方塊來啟用或停用複製 MQTT 屬性。
{
"endpointType": "Kafka",
"kafkaSettings": {
"copyMqttProperties": "Enabled"
}
}
kafkaSettings: {
copyMqttProperties: 'Enabled' // Or Disabled
}
kafkaSettings:
copyMqttProperties: Enabled # Or Disabled
下列各節說明啟用該設定時如何將 MQTT 屬性轉換為 Kafka 使用者標頭,反之亦然。
Kafka 端點是目的地
當 Kafka 端點是資料流程目的地時,所有 MQTT v5 規格定義的屬性都會轉換為 Kafka 使用者標頭。 例如,轉送到 Kafka 的具有「Content Type」的 MQTT v5 訊息會轉換為 Kafka 使用者標頭"Content Type":{specifiedValue}。 類似的規則適用於其他內建 MQTT 屬性,如下表中所定義。
| MQTT 屬性 |
已轉換的行為 |
| 承載格式指標 |
索引鍵:「承載格式指標」 值:"0" (承載為位元組) 或 "1" (承載為 UTF-8) |
| 回應主題 |
索引鍵:「回應主題」 值:原始訊息中的回應主題的複本。 |
| 訊息到期間隔 |
索引鍵:「訊息到期間隔」 值:訊息到期前的秒數 UTF-8 表示法。 如需詳細資料,請參閱訊息到期間隔屬性。 |
| 相互關聯資料: |
索引鍵:「相互關聯資料」 值:原始訊息中的相互關聯資料的複本。 與許多採用 UTF-8 編碼的 MQTT v5 屬性不同,相互關聯資料可以是任意資料。 |
| 內容類型: |
索引鍵:「內容類型」 值:原始訊息中的內容類型的複本。 |
MQTT v5 使用者屬性索引鍵值組會直接轉換為 Kafka 使用者標頭。 如果訊息中的使用者標頭的名稱與內建 MQTT 屬性相同 (例如,名為「相互關聯資料」的使用者標頭),則轉送 MQTT v5 規格屬性值或使用者屬性是未定義的。
資料流程永遠不會從 MQTT 代理程式中接收這些屬性。 因此,資料流程永遠不會轉送它們:
訊息到期間隔屬性
訊息到期間隔可指定訊息在被捨棄之前可以在 MQTT 代理程式中保留多久。
當資料流程接收到指定了「訊息到期間隔」的 MQTT 訊息時,它會:
- 記錄收到訊息的時間。
- 在訊息發送到目的地之前,會從原本的到期時間間隔中扣除訊息在佇列中等待的時間。
- 如果訊息尚未過期 (上述計算結果 > 0),則訊息會被發送到目的地並包含更新的「訊息到期時間」。
- 如果訊息已過期 (上述計算結果 <= 0),則目標不會發送該訊息。
範例:
- 資料流程會接收「訊息到期間隔」= 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 在此 MQTT 訊息被傳送到目標之前,已經過了 1,000 秒。 在此情況下,目的地的訊息會將其「訊息到期間隔」設定為 2600 (3600 - 1000) 秒。
- 該資料流程會接收「訊息到期間隔」= 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 不過,在此情況下,重新連線需要 4,000 秒。 訊息已過期,且資料流程不會將此訊息轉送至目的地。
Kafka 端點是資料流程的來源
附註
使用「事件中樞」端點作為資料流程的來源時有一個已知的問題,即 Kafka 標頭在轉換為 MQTT 時會損毀。 只有當透過「事件中樞」用戶端 (其底層使用 AMQP) 使用「事件中樞」時才會發生這種情況。 例如 "foo"="bar",雖然轉換了 "foo",但值變成了 "\xa1\x03bar"。
當 Kafka 端點是資料流程來源時,Kafka 使用者標頭會轉換為 MQTT v5 屬性。 下表說明如何將 Kafka 使用者標頭轉換為 MQTT v5 屬性。
| Kafka 標頭 |
已轉換的行為 |
| 索引鍵 |
索引鍵:「索引鍵」 值:原始訊息中的索引碼的複本。 |
| 時間戳記 |
索引鍵:「時間戳記」 值:Kafka 時間戳記的 UTF-8 編碼,即自 Unix Epoch 以來的毫秒數。 |
Kafka 使用者標頭索引鍵/值組 (假設它們全部以 UTF-8 編碼) 會直接轉換為 MQTT 使用者索引鍵/值屬性。
UTF-8 / 二進位格式不相符
MQTT v5 僅支援 UTF-8 型的屬性。 如果資料流程收到包含一或多個非 UTF-8 標頭的 Kafka 訊息,則資料流程會:
- 移除造成問題的屬性。
- 按照前述規則,將其餘的訊息轉送出去。
需要在 Kafka 來源標頭 = > MQTT 目標屬性中進行二進位傳輸的應用程式必須先對其進行 UTF-8 編碼 (例如,透過 Base64)。
>=64KB 屬性不相符
MQTT v5 屬性必須小於 64 KB。 如果資料流程收到包含一或多個 >= 64KB 的標頭的 Kafka 訊息,則資料流程會:
- 移除造成問題的屬性。
- 按照前述規則,將其餘的訊息轉送出去。
使用「事件中樞」和使用 AMQP 的生產者時的屬性轉換
如果您有一個轉送訊息的用戶端,則 Kafka 資料流程來源端點會執行下列任何動作:
- 使用用戶端程式庫 (例如 Azure.Messaging.EventHubs) 來將訊息傳送至事件中樞
- 直接使用 AMQP
需要注意屬性轉換時的一些細節差異。
您應執行下列其中一項動作:
- 避免傳送屬性
- 如果您必須傳送屬性,請傳送以 UTF-8 編碼的值。
當「事件中樞」將屬性從 AMQP 轉換為 Kafka 時,它會在其訊息中包含底層 AMQP 編碼類型。 如需行為的詳細資訊,請參閱使用不同通訊協定在取用者和產生者之間交換事件。
在下面的程式碼範例中,當資料流程端點接收到值 "foo":"bar" 時,它會將屬性接收為 <0xA1 0x03 "bar">。
using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;
var propertyEventBody = new BinaryData("payload");
var propertyEventData = new EventData(propertyEventBody)
{
Properties =
{
{"foo", "bar"},
}
};
var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);
資料流程端點無法將承載屬性 <0xA1 0x03 "bar"> 轉送至 MQTT 訊息,因為資料不是 UTF-8。 不過,如果您指定 UTF-8 字串,資料流程端點會在傳送至 MQTT 之前轉換該字串。 如果您使用 UTF-8 字串,MQTT 訊息將具有 "foo":"bar" 作為使用者屬性。
只會轉換 UTF-8 標頭。 例如,假設在下列將屬性設為浮點數的案例中:
Properties =
{
{"float-value", 11.9 },
}
資料流程端點會捨棄包含 "float-value" 欄位的封包。
並非所有事件資料屬性 (包括 propertyEventData.correlationId) 都會被轉送。 如需詳細資訊,請參閱事件使用者屬性、
CloudEvents
CloudEvents 是一種以通用方式描述事件資料的方法。 CloudEvents 設定用於傳送或接收 CloudEvents 格式的訊息。 您可以將 CloudEvents 用於事件驅動的架構,其中不同的服務需要在相同或不同的雲端提供者中相互通訊。
CloudEventAttributes 選項為 Propagate 或 CreateOrRemap。
在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [雲端事件屬性] 欄位來指定 CloudEvents 設定。
{
"endpointType": "Kafka",
"kafkaSettings": {
"cloudEventAttributes": "Propagate"
}
}
kafkaSettings: {
cloudEventAttributes: 'Propagate' // Or CreateOrRemap
}
kafkaSettings:
cloudEventAttributes: Propagate # Or CreateOrRemap
下列各節說明如何傳播或建立及重新對應 CloudEvent 屬性。
傳播設定
對於包含必要屬性的訊息,會傳遞 CloudEvent 屬性。 如果訊息未包含必要的屬性,則訊息會按原樣傳遞。 如果存在必要的屬性,則會在 CloudEvent 屬性名稱中新增 ce_ 前置詞。
| 名稱 |
必要 |
範例值 |
輸出名稱 |
輸出值 |
specversion |
是的 |
1.0 |
ce-specversion |
照原樣傳遞 |
type |
是的 |
ms.aio.telemetry |
ce-type |
照原樣傳遞 |
source |
是的 |
aio://mycluster/myoven |
ce-source |
照原樣傳遞 |
id |
是的 |
A234-1234-1234 |
ce-id |
照原樣傳遞 |
subject |
否 |
aio/myoven/sensor/temperature |
ce-subject |
照原樣傳遞 |
time |
否 |
2018-04-05T17:31:00Z |
ce-time |
照原樣傳遞。 它未被重新標記。 |
datacontenttype |
否 |
application/json |
ce-datacontenttype |
在選用的轉換階段之後已變更為輸出資料內容類型。 |
dataschema |
否 |
sr://fabrikam-schemas/123123123234234234234234#1.0.0 |
ce-dataschema |
如果在轉換組態中指定了輸出資料轉換結構描述,則 dataschema 會變更為輸出結構描述。 |
CreateOrRemap 設定
對於包含必要屬性的訊息,會傳遞 CloudEvent 屬性。 如果訊息未包含必要的屬性,則會產生屬性。
| 名稱 |
必要 |
輸出名稱 |
遺漏時產生的值 |
specversion |
是的 |
ce-specversion |
1.0 |
type |
是的 |
ce-type |
ms.aio-dataflow.telemetry |
source |
是的 |
ce-source |
aio://<target-name> |
id |
是的 |
ce-id |
在目標用戶端中產生的 UUID |
subject |
否 |
ce-subject |
傳送訊息所在的輸出主題 |
time |
否 |
ce-time |
在目標用戶端中產生為 RFC 3339 |
datacontenttype |
否 |
ce-datacontenttype |
在選用的轉換階段之後已變更為輸出資料內容類型 |
dataschema |
否 |
ce-dataschema |
結構描述登錄中定義的結構描述 |
後續步驟
若要深入了解資料流程,請參閱建立資料流程。