共用方式為


設定 Azure 事件中樞和 Kafka 資料流程端點

重要事項

此頁面包含使用 Kubernetes 部署資訊清單來管理 Azure IoT 操作元件的指示,其處於預覽狀態。 此功能以數項限制提供,不應用於生產工作負載。

請參閱 Microsoft Azure 預覽版增補使用規定,以了解適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的法律條款。

若要設定 Azure IoT 操作與 Apache Kafka 代理程式之間的雙向通訊,您可以設定資料流程端點。 此組態可讓您指定端點、傳輸層安全性 (TLS)、驗證和其他設定。

先決條件

Azure 事件中樞

Azure 事件中樞與 Kafka 通訊協定相容,而且可與資料流程搭配使用,但有一些限制。

建立 Azure 事件中樞命名空間和事件中樞

首先,建立已啟用 Kafka 的 Azure 事件中樞命名空間

接下來,在命名空間中建立事件中樞。 每個個別的事件中樞都會對應至 Kafka 主題。 您可以在相同的命名空間中建立多個事件中樞來代表多個 Kafka 主題。

將權限指派給受控識別

若要設定 Azure 事件中樞的資料流程端點,建議您使用使用者指派或系統指派的受控識別。 這種方法是安全的,而且不需要手動管理認證。

建立 Azure 事件中樞命名空間和事件中樞之後,您必須將角色指派給 Azure IoT Operations 受控識別,以授與傳送或接收訊息給事件中樞的許可權。

如果使用系統指派的受控識別,請在 Azure 入口網站中移至您的 Azure IoT 操作執行個體,然後選取 [概觀]。 複製 Azure IoT 操作Arc 延伸模組之後所列的延伸模組名稱。 例如,azure-iot-operations-xxxx7。 使用 Azure IoT 操作Arc 延伸模組的相同名稱,即可找到系統指派的受控識別。

然後,移至 [事件中樞] 命名空間 > [存取控制 (IAM)]>[新增角色指派]

  1. 在 [ 角色] 索引標籤上,選取適當的角色,例如 Azure Event Hubs Data SenderAzure Event Hubs Data Receiver。 這會為受控識別提供傳送或接收命名空間中所有事件中樞訊息所需的權限。 若要深入了解,請參閱使用 Microsoft Entra ID 驗證應用程式以存取事件中樞資源
  2. 在 [成員] 索引標籤上:
    1. 如果使用系統指派的受控識別,對於 [存取權指派對象為],請選取 [使用者、群組或服務主體] 選項,然後選取 [+ 選取成員] 並搜尋 Azure IoT 操作 Arc 延伸模組的名稱。
    2. 如果使用使用者指派的受控識別,對於 [存取權指派對象為],請選取 [受控識別] 選項,然後選取 [+ 選取成員] 並搜尋針對雲端連線設定的使用者指派的受控識別

建立 Azure 事件中樞的資料流程端點

設定 Azure 事件中樞命名空間和事件中樞之後,您可以為已啟用 Kafka 的 Azure 事件中樞命名空間建立數據流端點。

  1. 在 [作業體驗] 中,選取 [資料流程端點] 索引標籤。

  2. 在 [建立新的資料流程端點] 下,選取 [Azure 事件中樞]>[新增]

    使用作業體驗建立 Azure 事件中樞資料流程端點的螢幕擷取畫面。

  3. 輸入端點的下列設定:

    設定 描述
    名稱 資料流端點的名稱。
    主辦人 事件中樞主機的主機名稱。 您可以搜尋現有的事件中樞主機,或使用 格式 <NAMESPACE>.servicebus.windows.net手動輸入主機名。
    港口 事件中樞主機的連接埠。 對於事件中樞,連接埠為 9093
    驗證方法 用於驗證的方法。 我們建議您選擇系統指派的受控識別使用者指派的受控識別
  4. 選取 [套用] 以佈建端點。

附註

當您建立資料流程時,稍後會設定 Kafka 主題或個別的事件中樞。 Kafka 主題是資料流程訊息的目的地。

使用連接字串來對事件中樞進行驗證

重要事項

若要使用作業體驗 Web UI 來管理秘密,必須先設定 Azure Key Vault 並啟用工作負載身分識別,以使用安全設定來啟用 Azure IoT 作業。 若要深入了解,請參閱在 Azure IoT 操作部署中啟用安全設定

在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[SASL]

輸入端點的下列設定:

設定 描述
SASL 類型 選擇 Plain
已同步的祕密名稱 輸入包含連接字串的 Kubernetes 秘密名稱。
使用者名稱參考或權杖密碼 用於 SASL 驗證的使用者名稱參考或權杖密碼。 請從 Key Vault 清單中挑選它或建立一個新的。 值必須是 $ConnectionString
權杖密碼的密碼參考 用於 SASL 驗證的密碼參考或權杖密碼。 請從 Key Vault 清單中挑選它或建立一個新的。 值的格式必須是 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>

選取 [新增參考] 之後,如果您選取 [建立新的],請輸入下列設定:

設定 描述
祕密名稱 Azure Key Vault 中的祕密名稱。 挑選容易記住的名稱,以便稍後從清單中選取祕密。
祕密值 對於使用者名稱,請輸入 $ConnectionString。 對於密碼,請輸入格式為 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY> 的連接字串。
設定啟用日期 若已開啟,則為祕密變成作用中的日期。
設定到期日期 如果已開啟,則為祕密到期的日期。

若要深入了解秘密,請參閱在 Azure IoT 操作中建立和管理秘密

限制

Azure 事件中樞不支援 Kafka 支援的所有壓縮類型。 Azure 事件中樞進階層和專用層中目前僅支援 GZIP 壓縮。 使用其他壓縮類型可能會導致錯誤。

自訂 Kafka 代理程式

若要為非事件中樞 Kafka 代理程式設定資料流程端點,請根據需要設定主機、TLS、驗證和其他設定。

  1. 在 [作業體驗] 中,選取 [資料流程端點] 索引標籤。

  2. 在 [建立新的資料流程端點] 下,選取 [自訂 Kafka 代理程式]>[新增]

    使用作業體驗來建立 Kafka 資料流程端點的螢幕擷取畫面。

  3. 輸入端點的下列設定:

    設定 描述
    名稱 資料流端點的名稱。
    主辦人 格式為 <Kafka-broker-host>:xxxx 的 Kafka 代理程式的主機名稱。 在主機設定中包含連接埠號碼。
    驗證方法 用於驗證的方法。 選擇 SASL
    SASL 類型 SASL 驗證的類型。 選擇 PlainScramSha256ScramSha512。 如果使用 SASL,則為必要項。
    已同步的祕密名稱 秘密的名稱。 如果使用 SASL,則為必要項。
    權杖密碼的使用者名稱參考 SASL 權杖密碼中使用者名稱的參考。 如果使用 SASL,則為必要項。
  4. 選取 [套用] 以佈建端點。

附註

目前,作業體驗不支援使用 Kafka 資料流程端點作為來源。 您可以使用 Kubernetes 或 Bicep 來建立具有來源 Kafka 資料流程端點的資料流程。

若要自訂端點設定,請使用下列各節來取得詳細資訊。

可用的驗證方法

下列驗證方法可用於 Kafka 代理程式資料流程端點。

系統指派的受控識別

設定資料流端點之前,請先將角色指派給 Azure IoT 操作受控識別,其會授與連線至 Kafka 代理程式的權限:

  1. 在 Azure 入口網站中,移至您的 Azure IoT 操作執行個體,然後選取 [概觀]
  2. 複製 Azure IoT 操作Arc 延伸模組之後所列的延伸模組名稱。 例如,azure-iot-operations-xxxx7
  3. 移至您需要授與權限的雲端資源。 例如,移至 [事件中樞] 命名空間 > [存取控制 (IAM)]>[新增角色指派]
  4. 在 [ 角色] 索引標籤上,選取適當的角色。
  5. 在 [成員] 索引標籤上,對於 [存取權指派對象為],請選取 [使用者、群組或服務主體] 選項,然後選取 [+ 選取成員] 並搜尋 Azure IoT 操作受控識別。 例如,azure-iot-operations-xxxx7

然後,使用系統指派的受控識別設定來設定資料流端點。

在 [作業體驗資料流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[系統指派的受控識別]

此組態會建立具有預設對象的受控識別,該對象與 https://<NAMESPACE>.servicebus.windows.net 格式的事件中樞命名空間主機值相同。 不過,如果您需要覆寫預設對象,您可以將 audience 欄位設定為所需的值。

作業體驗中不支援。

使用者指派的受控識別

若要將使用者指派的受控識別用於驗證,您必須先部署已啟用安全設定的 Azure IoT 操作。 然後,您必須為雲端連線設定使用者指派的受控識別。 若要深入了解,請參閱在 Azure IoT 操作部署中啟用安全設定

在設定資料流程端點之前,請為使用者指派的受控識別指派一個角色,以授與連接到 Kafka 代理程式的權限:

  1. 在 Azure 入口網站中,移至您需要授與權限的雲端資源。 例如,移至 [事件方格] 命名空間 > [存取控制 (IAM)]>[新增角色指派]
  2. 在 [ 角色] 索引標籤上,選取適當的角色。
  3. 在 [成員] 索引標籤上,對於 [存取權指派對象為],請選取 [受控識別] 選項,然後選取 [+ 選取成員] 並搜尋使用者指派的受控識別。

然後,使用使用者指派的受控識別設定來設定資料流端點。

在 [作業體驗資料流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[使用者指派的受控識別]

在這裡,範圍是受控識別的對象。 預設值與事件中樞命名空間主機值相同,格式為 https://<NAMESPACE>.servicebus.windows.net。 不過,如果您需要覆寫預設對象,您可以使用 Bicep 或 Kubernetes,將範圍欄位設定為所需的值。

SASL

若要使用 SASL 進行驗證,請指定 SASL 驗證方法,並設定 SASL 類型和包含 SASL 權杖的祕密名稱的秘密參考。

在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[SASL]

輸入端點的下列設定:

設定 描述
SASL 類型 要使用的 SASL 驗證類型。 支援的類型為 PlainScramSha256ScramSha512
已同步的祕密名稱 包含 SASL 權杖的 Kubernetes 秘密名稱。
使用者名稱參考或權杖密碼 用於 SASL 驗證的使用者名稱參考或權杖密碼。
權杖密碼的密碼參考 用於 SASL 驗證的密碼參考或權杖密碼。

支援的 SASL 類型如下:

  • Plain
  • ScramSha256
  • ScramSha512

祕密必須與 Kafka 資料流程端點位於同一個命名空間中。 秘密必須具有 SASL 權杖作為索引鍵/值組。

匿名

若要使用匿名驗證,請更新 Kafka 設定的驗證區段以使用匿名方法。

在 [作業體驗資料流程端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法]>[無]

進階設定

您可以設定 Kafka 資料流程端點的進階設定,例如 TLS、受信任的 CA 憑證、Kafka 傳訊設定、批次處理和 CloudEvents。 您可以在資料流程端點 [進階] 入口網站索引標籤中或資料流程端點資源內設定這些設定。

在作業體驗中,選取資料流端點的 [進階] 索引標籤。

使用作業體驗來設定 Kafka 資料流程端點進階設定的螢幕擷取畫面。

TLS 設定

TLS 模式

若要啟用或停用 Kafka 端點的 TLS,請更新 TLS 設定中的 mode 設定。

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [啟用 TLS 模式] 旁的核取方塊。

TLS 模式可以設定為 EnabledDisabled。 如果模式設定為 Enabled,則資料流程會使用安全的連線到 Kafka 代理程式。 如果模式設定為 Disabled,則資料流程會使用不安全的連線到 Kafka 代理程式。

受信任的 CA 憑證

為 Kafka 端點設定受信任的 CA 憑證,以建立安全的連線到 Kafka 代理程式。 如果 Kafka 代理程式使用自我簽署憑證或由預設不受信任的自訂 CA 所簽署的憑證,則此設定就很重要。

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [信任的 CA 憑證組態對應] 欄位來指定包含受信任 CA 憑證的 ConfigMap。

此 ConfigMap 應包含 PEM 格式的 CA 憑證。 ConfigMap 必須與 Kafka 資料流程資源位於相同的命名空間中。 例如:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

秘訣

連接到 Azure 事件中樞時,不需要 CA 憑證,因為事件中樞服務會使用由預設受信任的公用 CA 所簽署的憑證。

取用者群組識別碼

取用者群組識別碼用於識別資料流程用於從 Kafka 主題讀取訊息的取用者群組。 取用者群組識別碼在 Kafka 代理程式內必須是唯一的。

重要事項

當 Kafka 端點用作來源時,需要取用者群組識別碼。 否則,資料流程無法從 Kafka 主題讀取訊息,而且您會收到錯誤「Kafka 類型來源端點必須已定義 consumerGroupId」。

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [取用者群組識別碼] 欄位來指定取用者群組識別碼。

只有當端點用作來源 (即資料流程是取用者) 時,此設定才會生效。

壓縮

壓縮欄位會啟用傳送至 Kafka 主題之訊息的壓縮。 壓縮有助於降低資料傳輸所需的網路頻寬和儲存空間。 不過,壓縮也會為流程增加一些額外負荷和延遲。 下表列出支援的壓縮類型。

描述
None 不會套用壓縮或批次處理。 如果未指定壓縮,則 None 是預設值。
Gzip 會套用 GZIP 壓縮和批次處理。 GZIP 是一般用途的壓縮演算法,可提供壓縮比例與速度之間的良好平衡。 Azure 事件中樞進階層和專用層目前僅支援 GZIP 壓縮
Snappy 會套用 Snappy 壓縮和批次處理。 Snappy 是一種快速壓縮演算法,可提供中度壓縮比例和速度。 Azure 事件中樞不支援此壓縮模式。
Lz4 會套用 LZ4 壓縮和批次處理。 LZ4 是一種快速壓縮演算法,可提供低壓縮比例和高速。 Azure 事件中樞不支援此壓縮模式。

若要設定壓縮:

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [壓縮] 欄位來指定壓縮類型。

只有當端點用作資料流程是生產者的目的地時,此設定才會生效。

批次處理

除了壓縮之外,您也可以在將訊息傳送至 Kafka 主題之前,先設定訊息的批次處理。 批次處理可讓您將多個訊息分組在一起,並將其壓縮為單一單位,以提升壓縮效率並降低網路負荷。

欄位 描述 必要
mode 可以是 EnabledDisabled。 預設值為 Enabled,因為 Kafka 沒有非批次處理傳訊的概念。 如果設定為 Disabled,則批次處理會被最小化,以便每次建立包含一則單一訊息的批次。
latencyMs 訊息在傳送之前可以緩衝處理的最大時間間隔,以毫秒為單位。 如果達到此間隔,則所有緩衝的訊息都會以批次方式傳送,而不論其大小為何。 如果未設定,預設值為 5。
maxMessages 在傳送之前可以緩衝處理的訊息數目上限。 如果達到此數字,則所有緩衝的訊息都會作為一批來傳送,無論它們有多大或緩衝多長的時間。 如果未設定,預設值為 100000。
maxBytes 在傳送之前可以緩衝處理的位元組大小上限。 如果達到此大小,則所有緩衝的訊息都會作為一批來傳送,無論其數量或緩衝的時間為何。 預設值為 1000000 (1 MB)。

例如,如果您將 latencyMs 設為 1000、將 maxMessages 設為 100,以及將 maxBytes 設為 1024,則當緩衝區中有 100 則訊息時,或當緩衝區中有 1,024 個位元組時,或自上次傳送以來已過去 1,000 毫秒時 (以先到者為準),就會傳送訊息。

若要設定批次處理:

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [啟用批次處理] 欄位來啟用批次處理。 使用批次處理延遲最大位元組數訊息計數欄位來指定批次處理設定。

只有當端點用作資料流程是生產者的目的地時,此設定才會生效。

分區處理策略

分區處理策略可控制將訊息傳送到 Kafka 主題時如何指派給 Kafka 分區。 Kafka 分區是 Kafka 主題的邏輯區段,可啟用平行處理和容錯。 Kafka 主題中的每個訊息都有一個分區和一個位移,其用來識別和排序訊息。

只有當端點用作資料流程是生產者的目的地時,此設定才會生效。

根據預設,資料流程會使用循環配置資源演算法,將訊息指派給隨機分區。 不過,您可以使用不同的策略,根據某些準則將訊息指派給分區,例如 MQTT 主題名稱或 MQTT 訊息屬性。 這可協助您達成更好的負載平衡、資料位置或訊息排序。

描述
Default 使用循環配置資源演算法,將訊息指派給隨機分區。 如果未指定任何策略,則這是預設值。
Static 將訊息指派給衍生自資料流程執行個體識別碼的固定分區編號。 這表示每個資料流程執行個體都會將訊息傳送至不同的分區。 這有助於達到更好的負載平衡和資料位置。
Topic 使用資料流程來源中的 MQTT 主題名稱作為分區的索引鍵。 這表示具有相同 MQTT 主題名稱的訊息會傳送至相同的分區。 這有助於達到更好的訊息排序和資料位置。
Property 使用資料流程來源中的 MQTT 訊息屬性作為資料分區的索引鍵。 在 [partitionKeyProperty] 欄位中指定屬性的名稱。 這表示具有相同屬性值的訊息會傳送至相同的分割區。 這有助於根據自訂準則達成更佳的訊息排序和資料位置。

例如,如果您將分區處理策略設定為 Property,並將分區索引鍵屬性設定為 device-id,則具有相同 device-id 屬性的訊息會傳送至相同的分區。

若要設定分區處理策略:

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [資料分割處理策略] 欄位來指定資料分割處理策略。 如果策略設定為 ,則使用 [分區索引鍵屬性]Property 欄位來指定用於分區的屬性。

Kafka 確認

Kafka 確認 (acks) 可用來控制傳送至 Kafka 主題之訊息的持久性和一致性。 當產生者將訊息傳送至 Kafka 主題時,它可以向 Kafka 代理程式要求不同層級的確認,以確保訊息已成功寫入主題並在 Kafka 叢集中複寫。

只有當端點用作目的地 (即資料流程為生產者) 時,此設定才會生效。

描述
None 資料流程不會等待來自 Kafka 代理程式的任何確認。 此設定是速度最快但最不持久的選項。
All 資料流程會等待訊息寫入領導者分區和所有追隨者分區。 此設定是最慢但最持久的選項。 此設定也是預設選項
One 資料流程會等待訊息寫入領導者分區和至少一個追隨者分區。
Zero 資料流程會等待訊息寫入領導者分區,但不會等待來自追隨者的任何確認。 這比 One 更快,但較不持久。

例如,如果您將 Kafka 確認設定為 All,則資料流程會等待訊息寫入領導者分區和所有追隨者分區後再傳送下一則訊息。

若要設定 Kafka 確認:

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [Kafka 確認] 欄位來指定 Kafka 確認層級。

只有當端點用作資料流程是生產者的目的地時,此設定才會生效。

複製 MQTT 屬性

根據預設,會啟用複製 MQTT 屬性設定。 這些使用者屬性包括 subject 之類的值,可儲存傳送訊息的資產名稱。

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [複製 MQTT 屬性] 欄位旁的核取方塊來啟用或停用複製 MQTT 屬性。

下列各節說明啟用該設定時如何將 MQTT 屬性轉換為 Kafka 使用者標頭,反之亦然。

Kafka 端點是目的地

當 Kafka 端點是資料流程目的地時,所有 MQTT v5 規格定義的屬性都會轉換為 Kafka 使用者標頭。 例如,轉送到 Kafka 的具有「Content Type」的 MQTT v5 訊息會轉換為 Kafka 使用者標頭"Content Type":{specifiedValue}。 類似的規則適用於其他內建 MQTT 屬性,如下表中所定義。

MQTT 屬性 已轉換的行為
承載格式指標 索引鍵:「承載格式指標」
值:"0" (承載為位元組) 或 "1" (承載為 UTF-8)
回應主題 索引鍵:「回應主題」
值:原始訊息中的回應主題的複本。
訊息到期間隔 索引鍵:「訊息到期間隔」
值:訊息到期前的秒數 UTF-8 表示法。 如需詳細資料,請參閱訊息到期間隔屬性
相互關聯資料: 索引鍵:「相互關聯資料」
值:原始訊息中的相互關聯資料的複本。 與許多採用 UTF-8 編碼的 MQTT v5 屬性不同,相互關聯資料可以是任意資料。
內容類型: 索引鍵:「內容類型」
值:原始訊息中的內容類型的複本。

MQTT v5 使用者屬性索引鍵值組會直接轉換為 Kafka 使用者標頭。 如果訊息中的使用者標頭的名稱與內建 MQTT 屬性相同 (例如,名為「相互關聯資料」的使用者標頭),則轉送 MQTT v5 規格屬性值或使用者屬性是未定義的。

資料流程永遠不會從 MQTT 代理程式中接收這些屬性。 因此,資料流程永遠不會轉送它們:

  • 主題別名
  • 訂閱識別碼
訊息到期間隔屬性

訊息到期間隔可指定訊息在被捨棄之前可以在 MQTT 代理程式中保留多久。

當資料流程接收到指定了「訊息到期間隔」的 MQTT 訊息時,它會:

  • 記錄收到訊息的時間。
  • 在訊息發送到目的地之前,會從原本的到期時間間隔中扣除訊息在佇列中等待的時間。
  • 如果訊息尚未過期 (上述計算結果 > 0),則訊息會被發送到目的地並包含更新的「訊息到期時間」。
  • 如果訊息已過期 (上述計算結果 <= 0),則目標不會發送該訊息。

範例:

  • 資料流程會接收「訊息到期間隔」= 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 在此 MQTT 訊息被傳送到目標之前,已經過了 1,000 秒。 在此情況下,目的地的訊息會將其「訊息到期間隔」設定為 2600 (3600 - 1000) 秒。
  • 該資料流程會接收「訊息到期間隔」= 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 不過,在此情況下,重新連線需要 4,000 秒。 訊息已過期,且資料流程不會將此訊息轉送至目的地。

Kafka 端點是資料流程的來源

附註

使用「事件中樞」端點作為資料流程的來源時有一個已知的問題,即 Kafka 標頭在轉換為 MQTT 時會損毀。 只有當透過「事件中樞」用戶端 (其底層使用 AMQP) 使用「事件中樞」時才會發生這種情況。 例如 "foo"="bar",雖然轉換了 "foo",但值變成了 "\xa1\x03bar"。

當 Kafka 端點是資料流程來源時,Kafka 使用者標頭會轉換為 MQTT v5 屬性。 下表說明如何將 Kafka 使用者標頭轉換為 MQTT v5 屬性。

Kafka 標頭 已轉換的行為
索引鍵 索引鍵:「索引鍵」
值:原始訊息中的索引碼的複本。
時間戳記 索引鍵:「時間戳記」
值:Kafka 時間戳記的 UTF-8 編碼,即自 Unix Epoch 以來的毫秒數。

Kafka 使用者標頭索引鍵/值組 (假設它們全部以 UTF-8 編碼) 會直接轉換為 MQTT 使用者索引鍵/值屬性。

UTF-8 / 二進位格式不相符

MQTT v5 僅支援 UTF-8 型的屬性。 如果資料流程收到包含一或多個非 UTF-8 標頭的 Kafka 訊息,則資料流程會:

  • 移除造成問題的屬性。
  • 按照前述規則,將其餘的訊息轉送出去。

需要在 Kafka 來源標頭 = > MQTT 目標屬性中進行二進位傳輸的應用程式必須先對其進行 UTF-8 編碼 (例如,透過 Base64)。

>=64KB 屬性不相符

MQTT v5 屬性必須小於 64 KB。 如果資料流程收到包含一或多個 >= 64KB 的標頭的 Kafka 訊息,則資料流程會:

  • 移除造成問題的屬性。
  • 按照前述規則,將其餘的訊息轉送出去。
使用「事件中樞」和使用 AMQP 的生產者時的屬性轉換

如果您有一個轉送訊息的用戶端,則 Kafka 資料流程來源端點會執行下列任何動作:

  • 使用用戶端程式庫 (例如 Azure.Messaging.EventHubs) 來將訊息傳送至事件中樞
  • 直接使用 AMQP

需要注意屬性轉換時的一些細節差異。

您應執行下列其中一項動作:

  • 避免傳送屬性
  • 如果您必須傳送屬性,請傳送以 UTF-8 編碼的值。

當「事件中樞」將屬性從 AMQP 轉換為 Kafka 時,它會在其訊息中包含底層 AMQP 編碼類型。 如需行為的詳細資訊,請參閱使用不同通訊協定在取用者和產生者之間交換事件

在下面的程式碼範例中,當資料流程端點接收到值 "foo":"bar" 時,它會將屬性接收為 <0xA1 0x03 "bar">

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

資料流程端點無法將承載屬性 <0xA1 0x03 "bar"> 轉送至 MQTT 訊息,因為資料不是 UTF-8。 不過,如果您指定 UTF-8 字串,資料流程端點會在傳送至 MQTT 之前轉換該字串。 如果您使用 UTF-8 字串,MQTT 訊息將具有 "foo":"bar" 作為使用者屬性。

只會轉換 UTF-8 標頭。 例如,假設在下列將屬性設為浮點數的案例中:

Properties = 
{
  {"float-value", 11.9 },
}

資料流程端點會捨棄包含 "float-value" 欄位的封包。

並非所有事件資料屬性 (包括 propertyEventData.correlationId) 都會被轉送。 如需詳細資訊,請參閱事件使用者屬性

CloudEvents

CloudEvents 是一種以通用方式描述事件資料的方法。 CloudEvents 設定用於傳送或接收 CloudEvents 格式的訊息。 您可以將 CloudEvents 用於事件驅動的架構,其中不同的服務需要在相同或不同的雲端提供者中相互通訊。

CloudEventAttributes 選項為 PropagateCreateOrRemap

在 [作業體驗資料流程端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [雲端事件屬性] 欄位來指定 CloudEvents 設定。

下列各節說明如何傳播或建立及重新對應 CloudEvent 屬性。

傳播設定

對於包含必要屬性的訊息,會傳遞 CloudEvent 屬性。 如果訊息未包含必要的屬性,則訊息會按原樣傳遞。 如果存在必要的屬性,則會在 CloudEvent 屬性名稱中新增 ce_ 前置詞。

名稱 必要 範例值 輸出名稱 輸出值
specversion 是的 1.0 ce-specversion 照原樣傳遞
type 是的 ms.aio.telemetry ce-type 照原樣傳遞
source 是的 aio://mycluster/myoven ce-source 照原樣傳遞
id 是的 A234-1234-1234 ce-id 照原樣傳遞
subject aio/myoven/sensor/temperature ce-subject 照原樣傳遞
time 2018-04-05T17:31:00Z ce-time 照原樣傳遞。 它未被重新標記。
datacontenttype application/json ce-datacontenttype 在選用的轉換階段之後已變更為輸出資料內容類型。
dataschema sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 如果在轉換組態中指定了輸出資料轉換結構描述,則 dataschema 會變更為輸出結構描述。

CreateOrRemap 設定

對於包含必要屬性的訊息,會傳遞 CloudEvent 屬性。 如果訊息未包含必要的屬性,則會產生屬性。

名稱 必要 輸出名稱 遺漏時產生的值
specversion 是的 ce-specversion 1.0
type 是的 ce-type ms.aio-dataflow.telemetry
source 是的 ce-source aio://<target-name>
id 是的 ce-id 在目標用戶端中產生的 UUID
subject ce-subject 傳送訊息所在的輸出主題
time ce-time 在目標用戶端中產生為 RFC 3339
datacontenttype ce-datacontenttype 在選用的轉換階段之後已變更為輸出資料內容類型
dataschema ce-dataschema 結構描述登錄中定義的結構描述

後續步驟

若要深入了解資料流程,請參閱建立資料流程