分享方式:


Azure Functions 的 Azure 事件中樞觸發程序

本文說明如何使用 Azure Functions 的 Azure 事件中樞 觸發程式。 Azure Functions 支援事件中樞的觸發程式和 輸出系結

如需安裝和組態詳細數據的詳細資訊,請參閱概

使用函式觸發程式來回應傳送至事件中樞事件數據流的事件。 您必須具有基礎事件中樞的讀取許可權,才能設定觸發程式。 觸發函式時,傳遞給函式的訊息會輸入為字串。

使用和進階方案的事件中樞調整決策是透過以目標為基礎的調整來完成。 如需詳細資訊,請參閱 以目標為基礎的調整

如需 Azure Functions 如何使用觸發程式回應傳送至事件中樞事件串流之事件的資訊,請參閱 整合事件中樞與 Azure 上的無伺服器函式。

重要

本文使用索引標籤來支援多個版本的 Node.js 程式設計模型。 v4 模型已正式推出,旨在為 JavaScript 和 TypeScript 開發人員提供更靈活且更直覺的體驗。 如需 v4 模型運作方式的更多詳細資料,請參閱 Azure Functions Node.js 開發人員指南。 若要深入了解 v3 與 v4 之間的差異,請參閱移轉指南

Azure Functions 支援兩種適用於 Python 的程式設計模型。 您定義系結的方式取決於您所選擇的程式設計模型。

Python v2 程式設計模型可讓您直接在 Python 函式程式代碼中使用裝飾項目來定義系結。 如需詳細資訊,請參閱 Python 開發人員指南

本文支援這兩種程序設計模型。

範例

下列範例示範根據 事件中樞觸發的 C# 函式 ,其中輸入訊息字串會寫入記錄:

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

下列範例顯示事件中樞觸發 TypeScript 函式。 函式會 讀取事件元數據 並記錄訊息。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

若要接收批次中的事件,請將 設定 cardinalitymany,如下列範例所示。

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

下列範例顯示事件中樞觸發 JavaScript 函式。 函式會 讀取事件元數據 並記錄訊息。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

若要接收批次中的事件,請將 設定 cardinalitymany,如下列範例所示。

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

以下是 PowerShell 程式代碼:

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

下列範例顯示事件中樞觸發程式系結,以及使用系結的 Python 函式。 函式會 讀取事件元數據 並記錄訊息。 此範例取決於您使用的是 v1 或 v2 Python 程式設計模型

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

下列範例顯示事件中樞觸發程式系結,此系結會記錄事件中樞觸發程式的訊息本文。

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

Java 函式運行時間連結庫中,對 EventHubTrigger 值來自事件中樞的參數使用註釋。 具有這些批注的參數會導致函式在事件送達時執行。 此批注可以搭配原生 Java 類型、POJO 或使用 可為 Null 的值使用 Optional<T>

下列範例說明大量使用 SystemProperties 和其他 Binding 選項,以進一步反省事件,並提供格式正確的 BlobOutput 日期階層式路徑。

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

屬性

進程內和隔離的背景工作進程 C# 連結庫都會使用 屬性來設定觸發程式。 C# 文稿會改用function.json組態檔,如 C# 腳本指南中所述

EventHubTriggerAttribute使用 在支援下列屬性的事件中樞上定義觸發程式。

參數 描述
EventHubName 事件中樞的名稱。 當事件中樞名稱也呈現於連接字串時,該值會在執行階段覆寫這個屬性。 可以在應用程式設定參考,例如%eventHubName%
ConsumerGroup 選擇性屬性,可設定用來訂閱中樞內事件的取用者群組。 省略時,會使用取 $Default 用者群組。
[連接] 應用程式設定或設定集合的名稱,指定如何連線到事件中樞。 若要深入了解,請參閱連線

裝飾項目

僅適用於 Python v2 程式設計模型。

針對使用裝飾項目定義的 Python v2 函式,在上 event_hub_message_trigger具有下列屬性:

屬性 說明
arg_name 代表函式程式碼中事件項目的變數名稱。
event_hub_name 事件中樞的名稱。 當事件中樞名稱也呈現於連接字串時,該值會在執行階段覆寫這個屬性。
connection 應用程式設定或設定集合的名稱,指定如何連線到事件中樞。 請參閱連線

如需使用 function.json 定義的 Python 函式,請參閱組 一節。

組態

僅適用於 Python v1 程式設計模型。

下表說明您可以在傳遞至 app.eventHub() 方法的物件options上設定的屬性。

屬性 說明
eventHubName 事件中樞的名稱。 當事件中樞名稱也呈現於連接字串時,該值會在執行階段覆寫這個屬性。 可透過應用程式設定參考 %eventHubName%
consumerGroup 選擇性屬性,可設定用來訂閱中樞內事件的取用者群組。 如果省略,則會使用 $Default 取用者群組。
基數 many設定為以啟用批處理。 如果省略或設定為 one,則會將單一訊息傳遞至 函式。
connection 應用程式設定或設定集合的名稱,指定如何連線到事件中樞。 請參閱連線

下表說明您在 function.json 檔案中設定的觸發程式組態屬性,其與運行時間版本不同。

function.json 屬性 描述
type 必須設定為 eventHubTrigger。 當您在 Azure 入口網站中建立觸發程序時,會自動設定此屬性。
direction 必須設定為 in。 當您在 Azure 入口網站中建立觸發程序時,會自動設定此屬性。
name 代表函式程式碼中事件項目的變數名稱。
eventHubName 事件中樞的名稱。 當事件中樞名稱也呈現於連接字串時,該值會在執行階段覆寫這個屬性。 可透過應用程式設定參考 %eventHubName%
consumerGroup 選擇性屬性,可設定用來訂閱中樞內事件的取用者群組。 如果省略,則會使用 $Default 取用者群組。
基數 many設定為以啟用批處理。 如果省略或設定為 one,則會將單一訊息傳遞至 函式。
connection 應用程式設定或設定集合的名稱,指定如何連線到事件中樞。 請參閱連線

當您在本機開發時,請在集合中的 local.settings.json 檔案Values中新增應用程式設定。

使用方式

若要深入瞭解事件中樞如何觸發和 IoT 中樞 觸發程序調整,請參閱使用 Azure Functions 取用事件。

事件中樞輸出系結所支援的參數類型取決於 Functions 運行時間版本、擴充套件版本,以及所使用的 C# 形式。

當您想要讓函式處理單一事件時,事件中樞觸發程式可以繫結至下列類型:

類型 描述
string 事件做為字串。 當事件是簡單的文字時,請使用 。
byte[] 事件的位元組。
JSON 可序列化型別 當事件包含 JSON 數據時,Functions 會嘗試將 JSON 數據還原串行化為一般舊的 CLR 物件 (POCO) 類型。
Azure.Messaging.EventHubs.EventData1 事件物件。
如果您要從任何舊版的事件中樞 SDK 移轉,請注意,此版本會捨棄舊版類型的支援 Body ,而支援 EventBody

當您想要讓函式處理一批事件時,事件中樞觸發程式可以繫結至下列類型:

類型 描述
string[] 批次中的事件陣列,做為字串。 每個專案都代表一個事件。
EventData[]1 批次中的事件陣列,做為 Azure.Messaging.EventHubs.EventData實例。 每個專案都代表一個事件。
T[] 其中 T 是 JSON 可串行化類型1 批次中的事件陣列,做為自定義POCO類型的實例。 每個專案都代表一個事件。

1 若要使用這些類型,您必須參考 Microsoft.Azure.Functions.Worker.Extensions.EventHubs 5.5.0 或更新版本 ,以及 SDK 類型系結的常見相依性。

參數類型可以是下列其中一項:

  • 任何原生 Java 類型,例如 int、String、byte[]。
  • 使用選擇性的可為 Null 值。
  • 任何 POJO 類型。

若要深入瞭解,請參閱 EventHubTrigger 參考。

事件元數據

事件中樞觸發程式提供數個 元數據屬性。 元數據屬性可用來做為其他系結中系結表達式的一部分,或做為程式代碼中的參數。 屬性來自 EventData 類別。

屬性 類型​ 描述
PartitionContext PartitionContext PartitionContext 執行個體。
EnqueuedTimeUtc DateTime UTC 加入佇列的時間。
Offset string 相對於事件中樞數據分割數據流的數據位移。 位移是事件中樞數據流內事件的標記或標識碼。 標識元在事件中樞數據流的數據分割內是唯一的。
PartitionKey string 事件數據應該傳送至其中的分割區。
Properties IDictionary<String,Object> 事件數據的用戶屬性。
SequenceNumber Int64 事件的邏輯序號。
SystemProperties IDictionary<String,Object> 系統屬性,包括事件數據。

請參閱 本文稍早使用這些屬性的程式代碼範例

連線

屬性 connection 是環境組態的參考,指定應用程式應該如何連線到事件中樞。 此屬性可以指定:

如果設定的值與單一設定完全相符,又與其他設定的開頭相符,則會使用完全相符項目。

Connection string

按兩下命名空間[連線資訊] 按鈕,而非事件中樞本身,以取得此 連接字串。 連接字串 必須是事件中樞命名空間,而不是事件中樞本身。

用於觸發程式時,連接字串 至少必須具有「讀取」許可權才能啟動函式。 當用於輸出系結時,連接字串 必須具有「傳送」許可權,才能將訊息傳送至事件數據流。

這個 連接字串 應該儲存在應用程式設定中,其名稱符合系結組態的 屬性所connection指定的值。

身分識別型連線

如果您使用 5.x 版或更高版本的延伸模組,而不是使用具有秘密的 連接字串,您可以讓應用程式使用 Microsoft Entra 身分識別。 若要執行此動作,您會在對應至觸發程序和繫結設定中 connection 屬性的通用前置詞下定義設定。

在此模式中,延伸模組需要下列屬性:

屬性 環境變數範本 描述 範例值
完整Namespace <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace 完整事件中樞命名空間。 myeventhubns.servicebus.windows.net

還可以設定其他屬性來自訂連線。 請參閱身分識別型連線的通用屬性

注意

使用 Azure 應用程式組態Key Vault 來提供「受控識別」連線的設定時,設定名稱應使用有效的索引鍵分隔符號,例如::/ 取代 __,以確保正確解析名稱。

例如: <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace

主控於 Azure Functions 服務時,以身分識別為基礎的連接會使用受控識別。 雖然可以使用 credentialclientID 屬性指定使用者指派的身分識別,但預設會使用系統指派的身分識別。 請注意,支援以資源識別碼來設定使用者指派的身分識別。 在本機開發等其他內容中執行時,雖然這可以自訂,但仍會改用您的開發人員身分識別。 請參閱使用身分識別型連線進行本機開發

授與權限給身分識別

正在使用的任何身分識別,都必須具有執行預期動作的權限。 有關大多數 Azure 服務,意即您需要指派 Azure RBAC 的角色,利用提供這些權限的內建或自訂角色。

重要

部分權限可能會由所有內容都不需要的目標服務公開。 可以的話,請遵循最低權限原則,只授與身分識別所需的權限。 例如,如果應用程式只需要能夠讀取資料來源,請使用只有讀取權限的角色。 不宜指派也允許寫入該服務的角色,因為讀取作業不需要這麼多權限。 同樣地,最好確保角色指派的範圍僅限於需要讀取的資源。

您必需建立可在執行階段存取事件中樞的角色指派。 角色指派的範圍可以針對事件中樞命名空間,或事件中樞本身。 擁有者等的管理角色不足。 下方資料表顯示一般作業中使用事件中樞延伸模組時建議的內建角色。 您的應用程式可能會根據您寫入的程式碼要求額外的權限。

繫結類型 內建角色範例
觸發程序 Azure 事件中樞資料接收者Azure 事件中樞資料擁有者
輸出繫結 Azure 事件中樞資料傳送者

host.json 設定

host.json檔案包含控制事件中樞觸發行為的設定。 如需可用設定的詳細資訊,請參閱host.json設定一節。

下一步