你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

适用于 .NET 的 Azure WebJobs 事件中心客户端库 - 版本 6.0.1

此扩展提供从 Azure 函数访问Azure 事件中心的功能。

入门

安装包

使用 NuGet 安装事件中心扩展:

dotnet add package Microsoft.Azure.WebJobs.Extensions.EventHubs

先决条件

  • Azure 订阅:若要使用 Azure 服务(包括 Azure 事件中心),需要订阅。 如果没有现有的 Azure 帐户,可以在创建帐户时注册免费试用版或使用 Visual Studio 订阅权益。

  • 包含事件中心的事件中心命名空间:若要与Azure 事件中心交互,还需要提供命名空间和事件中心。 如果不熟悉如何创建 Azure 资源,可能需要按照使用 Azure 门户创建事件中心的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建事件中心的详细说明。

  • 具有 Blob 存储的 Azure 存储帐户: 若要在 Azure 存储中将检查点保留为 Blob,需要有一个 Azure 存储帐户,其中包含可用的 Blob。 如果不熟悉 Azure 存储帐户,可能需要按照使用 Azure 门户创建存储帐户的分步指南进行操作。 还可以在此处找到有关使用 Azure CLI、Azure PowerShell或 Azure 资源管理器 (ARM) 模板创建存储帐户的详细说明。

部署按钮

对客户端进行身份验证

若要使事件中心客户端库与事件中心交互,需要了解如何与之连接和授权。 执行此操作的最简单方法是使用连接字符串,该连接字符串是在创建事件中心命名空间时自动创建的。 如果不熟悉将连接字符串与事件中心配合使用,可能需要按照分步指南获取事件中心连接字符串

ConnectionEventHubTriggerAttributeEventHubAttribute 属性用于指定存储连接字符串的配置属性。

连接字符串AzureWebJobsStorage用于保留处理检查点信息。

对于本地开发,local.settings.json请使用 文件来存储连接字符串:

{
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "<connection_name>": "Endpoint=sb://<event_hubs_namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Jya7Eh76HU92ibsxuk1ITN8CM8Bt76YLKf5ISjU3jZ8="
  }
}

部署后,使用应用程序设置设置连接字符串。

基于标识的身份验证

如果环境已启用 托管标识 ,则可以使用它对事件中心扩展进行身份验证。 在执行此操作之前,需要确保已按照Azure Functions开发人员指南中所述配置权限。

若要使用基于标识的身份验证,请 <connection_name>__fullyQualifiedNamespace 提供配置设置。

{
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "<connection_name>__fullyQualifiedNamespace": "{event_hubs_namespace}.servicebus.windows.net"
  }
}

或者在部署应用的情况下,在 应用程序设置中设置相同的设置:

<connection_name>__fullyQualifiedNamespace={event_hubs_namespace}.servicebus.windows.net

有关配置基于标识的连接的详细信息,可 在此处找到。

关键概念

事件中心触发器

事件中心触发器允许在消息发送到事件中心时执行函数。

请按照Azure 事件中心触发器教程了解有关事件中心触发器的详细信息。

事件中心输出绑定

事件中心输出绑定允许函数发送事件中心事件。

请按照Azure 事件中心输出绑定了解有关事件中心绑定的详细信息。

支持的类型

触发器和输出绑定支持以下类型:

  • EventData
  • string - 将使用 UTF8 编码对值进行编码
  • BinaryData
  • byte[]
  • 自定义模型类型将使用 Newtonsoft.Json 进行 JSON 序列化
  • IAsyncCollector<T> 上述任何批处理触发器类型
  • EventHubProducerClient 输出绑定

示例

发送单个事件

可以通过应用 EventHubAttribute 函数返回值将单个事件发送到事件中心。 返回值可以是 stringEventData 类型。 使用返回值时,可能无法指定分区键;为此,需要绑定到 IAsyncCollector<EventData>,如 发送多个事件中所示。

[FunctionName("BindingToReturnValue")]
[return: EventHub("<event_hub_name>", Connection = "<connection_name>")]
public static string Run([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer)
{
    // This value would get stored in EventHub event body.
    // The string would be UTF8 encoded
    return $"C# Timer trigger function executed at: {DateTime.Now}";
}

发送多个事件

若要从单个 Azure 函数调用发送多个事件,可以将 应用于 EventHubAttributeIAsyncCollector<string>IAsyncCollector<EventData> 参数。 只能在绑定到 IAsyncCollector<EventData>时使用分区键。

[FunctionName("BindingToCollector")]
public static async Task Run(
    [TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
    [EventHub("<event_hub_name>", Connection = "<connection_name>")] IAsyncCollector<EventData> collector)
{
    // When no partition key is used, partitions will be assigned per-batch via round-robin.
    await collector.AddAsync(new EventData($"Event 1 added at: {DateTime.Now}"));
    await collector.AddAsync(new EventData($"Event 2 added at: {DateTime.Now}"));

    // Using a partition key will help group events together; events with the same key
    // will always be assigned to the same partition.
    await collector.AddAsync(new EventData($"Event 3 added at: {DateTime.Now}"), "sample-key");
    await collector.AddAsync(new EventData($"Event 4 added at: {DateTime.Now}"), "sample-key");
}

使用绑定到强类型模型

若要使用具有 EventHub 绑定的强类型模型类,请将 EventHubAttribute 应用于模型参数。

[FunctionName("TriggerSingleModel")]
public static void Run(
    [EventHubTrigger("<event_hub_name>", Connection = "<connection_name>")] Dog dog,
    ILogger logger)
{
    logger.LogInformation($"Who's a good dog? {dog.Name} is!");
}

使用 EventHubProducerClient 发送多个事件

还可以直接绑定到 , EventHubProducerClient 以对事件发送拥有最大控制权。

[FunctionName("BindingToProducerClient")]
public static async Task Run(
    [TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
    [EventHub("<event_hub_name>", Connection = "<connection_name>")] EventHubProducerClient eventHubProducerClient)
{
    // IAsyncCollector allows sending multiple events in a single function invocation
    await eventHubProducerClient.SendAsync(new[]
    {
        new EventData($"Event 1 added at: {DateTime.Now}"),
        new EventData($"Event 2 added at: {DateTime.Now}")
    });
}

按事件触发器

若要在每次将事件发送到事件中心时运行函数,EventHubTriggerAttributestring请将 应用于 或 EventData 参数。

[FunctionName("TriggerSingle")]
public static void Run(
    [EventHubTrigger("<event_hub_name>", Connection = "<connection_name>")] string eventBodyAsString,
    ILogger logger)
{
    logger.LogInformation($"C# function triggered to process a message: {eventBodyAsString}");
}

批处理触发器

若要为接收的一批事件运行函数,请将 EventHubTriggerAttributestring[] 应用于 或 EventData[] 参数。

[FunctionName("TriggerBatch")]
public static void Run(
    [EventHubTrigger("<event_hub_name>", Connection = "<connection_name>")] EventData[] events,
    ILogger logger)
{
    foreach (var e in events)
    {
        logger.LogInformation($"C# function triggered to process a message: {e.EventBody}");
        logger.LogInformation($"EnqueuedTime={e.EnqueuedTime}");
    }
}

疑难解答

有关故障排除指南,请参阅监视Azure Functions

后续步骤

阅读Azure Functions或创建 Azure 函数指南简介

贡献

有关构建、测试和参与此库的详细信息,请参阅我们的 CONTRIBUTING.md

本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 cla.microsoft.com

此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。

曝光数