Desencadenador de Azure Event Hubs para Azure Functions

En este artículo se explica cómo usar el desencadenador de Azure Event Hubs para Azure Functions. Azure Functions admite los enlaces de salida y desencadenador para Event Hubs.

Para obtener información sobre los detalles de instalación y configuración, vea la información general.

Use el desencadenador de funciones para responder a un evento enviado a una secuencia de eventos del centro de eventos. Debe tener acceso de lectura al centro de eventos subyacente para configurar el desencadenador. Cuando esta función se desencadena, el mensaje que se pasa a la función se escribe como una cadena.

Las decisiones de escalado de Event Hubs para los planes de Consumo y Premium se realizan a través del escalado basado en el destino. Para obtener más información, consulte Escalado basado en el destino.

Para obtener información sobre cómo Azure Functions responde a los eventos enviados a un flujo de eventos del centro de eventos mediante desencadenadores, consulte Integración de Event Hubs con funciones sin servidor en Azure.

Importante

En este artículo se usan pestañas para admitir varias versiones del modelo de programación de Node.js. El modelo v4 está disponible de forma general y está diseñado para que los desarrolladores de JavaScript y TypeScript tengan una experiencia más flexible e intuitiva. Para más detalles acerca de cómo funciona el modelo v4, consulte la Guía para desarrolladores de Node.js de Azure Functions. Para más información sobre las diferencias entre v3 y v4, consulte la Guía de migración.

Azure Functions admite dos modelos de programación para Python. La forma en que defina los enlaces depende del modelo de programación seleccionado.

El modelo de programación de Python v2 permite definir enlaces mediante decoradores directamente en el código de función de Python. Para más información, consulte la Guía para desarrolladores de Python.

En este artículo se admiten los modelos de programación.

Ejemplo

En el ejemplo siguiente se muestra una función de C# que se desencadena en función de un centro de eventos, donde la cadena de mensaje de entrada se escribe en los registros:

{
    private readonly ILogger<EventHubsFunction> _logger;

    public EventHubsFunction(ILogger<EventHubsFunction> logger)
    {
        _logger = logger;
    }

    [Function(nameof(EventHubFunction))]
    [FixedDelayRetry(5, "00:00:10")]
    [EventHubOutput("dest", Connection = "EventHubConnection")]
    public string EventHubFunction(
        [EventHubTrigger("src", Connection = "EventHubConnection")] string[] input,
        FunctionContext context)
    {
        _logger.LogInformation("First Event Hubs triggered message: {msg}", input[0]);

        var message = $"Output message created at {DateTime.Now}";
        return message;
    }

En el ejemplo siguiente se muestra una función TypeScript del desencadenador de Event Hubs. La función lee los metadatos del evento y registra el mensaje.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(message: unknown, context: InvocationContext): Promise<void> {
    context.log('Event hub function processed message:', message);
    context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
    context.log('Offset =', context.triggerMetadata.offset);
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: eventHubTrigger1,
});

Para recibir eventos en un lote, establezca cardinality en many, como se muestra en el ejemplo siguiente.

import { app, InvocationContext } from '@azure/functions';

export async function eventHubTrigger1(messages: unknown[], context: InvocationContext): Promise<void> {
    context.log(`Event hub function processed ${messages.length} messages`);
    for (let i = 0; i < messages.length; i++) {
        context.log('Event hub message:', messages[i]);
        context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
        context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
        context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
    }
}

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: eventHubTrigger1,
});

En el ejemplo siguiente se muestra una función JavaScript del desencadenador de Event Hubs. La función lee los metadatos del evento y registra el mensaje.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'one',
    handler: (message, context) => {
        context.log('Event hub function processed message:', message);
        context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
        context.log('SequenceNumber =', context.triggerMetadata.sequenceNumber);
        context.log('Offset =', context.triggerMetadata.offset);
    },
});

Para recibir eventos en un lote, establezca cardinality en many, como se muestra en el ejemplo siguiente.

const { app } = require('@azure/functions');

app.eventHub('eventHubTrigger1', {
    connection: 'myEventHubReadConnectionAppSetting',
    eventHubName: 'MyEventHub',
    cardinality: 'many',
    handler: (messages, context) => {
        context.log(`Event hub function processed ${messages.length} messages`);
        for (let i = 0; i < messages.length; i++) {
            context.log('Event hub message:', messages[i]);
            context.log(`EnqueuedTimeUtc = ${context.triggerMetadata.enqueuedTimeUtcArray[i]}`);
            context.log(`SequenceNumber = ${context.triggerMetadata.sequenceNumberArray[i]}`);
            context.log(`Offset = ${context.triggerMetadata.offsetArray[i]}`);
        }
    },
});

Este es el código de PowerShell:

param($eventHubMessages, $TriggerMetadata)

Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"

$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }

En el ejemplo siguiente se muestra un enlace de desencadenador de Event Hubs y una función de Python que usa el enlace. La función lee los metadatos del evento y registra el mensaje. El ejemplo depende de si usa el modelo de programación de Python v1 o v2.

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub", 
                               event_hub_name="<EVENT_HUB_NAME>",
                               connection="<CONNECTION_SETTING>") 
def test_function(myhub: func.EventHubEvent):
    logging.info('Python EventHub trigger processed an event: %s',
                myhub.get_body().decode('utf-8'))

En el ejemplo siguiente se muestra un enlace del desencadenador de Event Hubs que registra el cuerpo del mensaje del desencadenador de Event Hubs.

@FunctionName("ehprocessor")
public void eventHubProcessor(
  @EventHubTrigger(name = "msg",
                  eventHubName = "myeventhubname",
                  connection = "myconnvarname") String message,
       final ExecutionContext context )
       {
          context.getLogger().info(message);
 }

En la biblioteca de runtime de funciones de Java, use la anotación EventHubTrigger en parámetros cuyo valor proviene del centro de eventos. Los parámetros con estas anotaciones hacen que la función se ejecuta cuando llega un evento. Esta anotación se puede usar con tipos nativos de Java, POJO o valores que aceptan valores NULL mediante Optional<T>.

En el ejemplo siguiente se muestra un amplio uso de SystemProperties y otras opciones de enlace para una mayor introspección del evento, además de proporcionar una ruta de acceso bien formada BlobOutput que es jerárquica Date.

package com.example;
import java.util.Map;
import java.time.ZonedDateTime;

import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

/**
 * Azure Functions with Event Hub trigger.
 * and Blob Output using date in path along with message partition ID
 * and message sequence number from EventHub Trigger Properties
 */
public class EventHubReceiver {

    @FunctionName("EventHubReceiver")
    @StorageAccount("bloboutput")

    public void run(
            @EventHubTrigger(name = "message",
                eventHubName = "%eventhub%",
                consumerGroup = "%consumergroup%",
                connection = "eventhubconnection",
                cardinality = Cardinality.ONE)
            String message,

            final ExecutionContext context,

            @BindingName("Properties") Map<String, Object> properties,
            @BindingName("SystemProperties") Map<String, Object> systemProperties,
            @BindingName("PartitionContext") Map<String, Object> partitionContext,
            @BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,

            @BlobOutput(
                name = "outputItem",
                path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
                       "{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
            OutputBinding<String> outputItem) {

        var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
                                                             // indicator
        context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
        context.getLogger().info("Properties: " + properties);
        context.getLogger().info("System Properties: " + systemProperties);
        context.getLogger().info("partitionContext: " + partitionContext);
        context.getLogger().info("EnqueuedTimeUtc: " + et);

        outputItem.setValue(message);
    }
}

Atributos

Las bibliotecas de C# de procesos de trabajo aislados y en proceso usan un atributo para configurar el desencadenador. En su lugar, el script de C# usa un archivo de configuración function.json como se describe en la guía de scripting de C#.

Use el atributo EventHubTriggerAttribute para definir un desencadenador en un centro de eventos, que admite las siguientes propiedades.

Parámetros Descripción
EventHubName El nombre del centro de eventos. Cuando el nombre del centro de eventos también está presente en la cadena de conexión, ese valor reemplaza esta propiedad en tiempo de ejecución. Se puede hacer referencia a él en la configuración de la aplicación, como %eventHubName%.
ConsumerGroup Una propiedad opcional que establece el grupo de consumidores que se usará para suscribirse a los eventos del centro. Si se pasa por alto, se utilizará el grupo de consumidores $Default.
Connection Nombre de una configuración de aplicación o de una colección de configuraciones de aplicación que especifica cómo conectarse a Event Hubs. Para más información, consulte Conexiones.

Elementos Decorator

Solo se aplica al modelo de programación de Python v2.

Para las funciones de Python v2 definidas mediante un decorador, se aplican las siguientes propiedades en cosmos_db_trigger:

Propiedad Descripción
arg_name Nombre de la variable que representa el elemento de evento en el código de la función.
event_hub_name El nombre del centro de eventos. Cuando el nombre del centro de eventos también está presente en la cadena de conexión, ese valor reemplaza esta propiedad en tiempo de ejecución.
connection Nombre de una configuración de aplicación o de una colección de configuraciones de aplicación que especifica cómo conectarse a Event Hubs. Consulte Conexiones.

Para las funciones de Python definidas mediante function.json, consulte la sección Configuración.

anotaciones

En la biblioteca de tiempo de ejecución de funciones de Java, use la anotación EventHubTrigger, que admite la siguiente configuración:

Configuración

Solo se aplica al modelo de programación de Python v1.

En la tabla siguiente se explican las propiedades que puede establecer en el objeto options que se pasa al métodoapp.eventHub().

Propiedad Descripción
eventHubName El nombre del centro de eventos. Cuando el nombre del centro de eventos también está presente en la cadena de conexión, ese valor reemplaza esta propiedad en tiempo de ejecución. Se puede hacer referencia a él mediante la configuración de la aplicación%eventHubName%
consumerGroup Una propiedad opcional que establece el grupo de consumidores que se usará para suscribirse a los eventos del centro. Si se pasa por alto, se utilizará el grupo de consumidores $Default.
cardinalidad Defínalo como many para permitir el procesamiento por lotes. Si se omite o se define como one, se pasa un único mensaje a la función.
connection Nombre de una configuración de aplicación o de una colección de configuraciones de aplicación que especifica cómo conectarse a Event Hubs. Consulte Conexiones.

En la siguiente tabla se explican las propiedades de configuración del desencadenador que definió en el archivo function.json, que difiere según la versión de tiempo de ejecución.

Propiedad de function.json Descripción
type Se debe establecer en eventHubTrigger. Esta propiedad se establece automáticamente cuando se crea el desencadenador en Azure Portal.
direction Se debe establecer en in. Esta propiedad se establece automáticamente cuando se crea el desencadenador en Azure Portal.
name Nombre de la variable que representa el elemento de evento en el código de la función.
eventHubName El nombre del centro de eventos. Cuando el nombre del centro de eventos también está presente en la cadena de conexión, ese valor reemplaza esta propiedad en tiempo de ejecución. Se puede hacer referencia a él mediante la configuración de la aplicación%eventHubName%
consumerGroup Una propiedad opcional que establece el grupo de consumidores que se usará para suscribirse a los eventos del centro. Si se pasa por alto, se utilizará el grupo de consumidores $Default.
cardinalidad Defínalo como many para permitir el procesamiento por lotes. Si se omite o se define como one, se pasa un único mensaje a la función.
connection Nombre de una configuración de aplicación o de una colección de configuraciones de aplicación que especifica cómo conectarse a Event Hubs. Consulte Conexiones.

Cuando esté desarrollando localmente, agregue la configuración de la aplicación en el archivo local.settings.json de la colección Values.

Uso

Para obtener más información sobre cómo se escala el desencadenador de Event Hubs y el desencadenador de IoT Hub, consulte Consumo de eventos con Azure Functions.

El tipo de parámetro admitido por el enlace de salida de Event Hubs depende de la versión del tiempo de ejecución de Functions, la versión del paquete de extensión y la modalidad de C# utilizada.

Cuando quiera que la función procese un único evento, el desencadenador de Event Hubs puede enlazarse a los siguientes tipos:

Tipo Descripción
string El evento como una cadena. Se usa cuando el evento es de texto simple.
byte[] Bytes del evento.
Tipos serializables con JSON Cuando un evento contenga datos JSON, Functions intentará deserializar los datos JSON en un tipo de objeto CLR sin formato (POCO).
Azure.Messaging.EventHubs.EventData1 El objeto de evento.
Si va a migrar desde cualquier versión anterior de los SDK de Event Hubs, tenga en cuenta que esta versión quitará la compatibilidad con el tipo Body heredado en favor de EventBody.

Cuando quiera que la función procese un lote de eventos, podrá enlazarse el desencadenador de Event Hubs a los siguientes tipos:

Tipo Descripción
string[] Una matriz de eventos del lote, como cadenas. Cada entrada representa un evento.
EventData[]1 Matriz de eventos del lote, como instancias de Azure.Messaging.EventHubs.EventData. Cada entrada representa un evento.
T[] donde T es un tipo JSON serializable1 Matriz de eventos del lote, como instancias de un tipo POCO personalizado. Cada entrada representa un evento.

1 Para utilizar estos tipos, debe hacer referencia a Microsoft.Azure.Functions.Worker.Extensions.EventGrid 5.5.0 o posterior y a las dependencias comunes para los enlaces de tipo SDK.

Este tipo de parámetro puede ser uno de los siguientes:

  • Cualquier tipo nativo de Java como int, String, byte[].
  • Valores que admiten valores NULL mediante Optional.
  • Cualquier tipo POJO.

Para obtener más información, consulte EventHubTrigger.

Metadatos de evento

El desencadenador de Event Hubs proporciona varias propiedades de metadatos. Se pueden usar propiedades de metadatos como parte de expresiones de enlace en otros enlaces o como parámetros del código. Las propiedades proceden de la clase EventData.

Propiedad Tipo Descripción
PartitionContext PartitionContext Instancia de PartitionContext.
EnqueuedTimeUtc DateTime Hora de puesta en la cola en UTC.
Offset string El desplazamiento de los datos relacionados con el flujo de partición del centro de eventos. El desplazamiento es un marcador o identificador del flujo de Event Hubs. El identificador es único dentro de una partición del flujo de Event Hubs.
PartitionKey string La partición a la que se deben enviar los datos del evento.
Properties IDictionary<String,Object> Las propiedades de usuario de los datos del evento.
SequenceNumber Int64 El número de secuencia de registro del evento.
SystemProperties IDictionary<String,Object> Las propiedades del sistema, incluidos los datos del evento.

Consulte los ejemplos de código que utilizan estas propiedades más arriba en este artículo.

Conexiones

La propiedad connection es una referencia a la configuración del entorno que especifica cómo se debe conectar la aplicación a Event Hubs. Puede especificar lo siguiente:

Si el valor configurado es tanto una coincidencia exacta de una única configuración como una coincidencia de prefijo de otras configuraciones, se usa la coincidencia exacta.

Cadena de conexión

Para obtener esta cadena de conexión, haga clic en el botón Información de conexión del espacio de nombres, no del propio centro de eventos. La cadena de conexión debe ser para un espacio de nombres de Event Hubs, no para el propio centro de eventos.

Cuando se usa para desencadenadores, la cadena de conexión debe tener al menos permisos de "lectura" para activar la función. Cuando se usa para los enlaces de salida, la cadena de conexión debe tener permisos de "envío" para enviar mensajes al flujo de eventos.

La cadena de conexión debe almacenarse en una configuración de la aplicación con un nombre que coincida con el valor especificado por la propiedad connection de la configuración de enlace.

Conexiones basadas en identidades

Si usa versión 5.x o posterior de la extensión, en lugar de usar una cadena de conexión con un secreto, puede hacer que la aplicación use una Identidad de Microsoft Entra. Para ello, definiría la configuración con un prefijo común que se asigne a la propiedad connection en la configuración de desencadenador y enlace.

En este modo, la extensión requiere las siguientes propiedades:

Propiedad Plantilla de variable de entorno Descripción Valor de ejemplo
Espacio de nombres completo <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace Espacio de nombres completo de Event Hubs. myeventhubns.servicebus.windows.net

Se pueden establecer propiedades adicionales para personalizar la conexión. Consulte Propiedades comunes para conexiones basadas en identidades.

Nota

Al usar Azure App Configuration o Key Vault para proporcionar la configuración de las conexiones de identidad administrada, los nombres de configuración deben usar un separador de clave válido, como : o / en lugar de __ para asegurarse de que los nombres se resuelven correctamente.

Por ejemplo, <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace.

Cuando se hospeda en el servicio de Azure Functions, las conexiones basadas en identidades usan una identidad administrada. La identidad asignada por el sistema se usa de manera predeterminada, aunque se puede especificar una identidad asignada por el usuario con las propiedades credential y clientID. Tenga en cuenta que no se admite la configuración de una identidad asignada por el usuario con un identificador de recurso. Cuando se ejecuta en otros contextos, como el de desarrollo local, se usa en su lugar la identidad del desarrollador, aunque se puede personalizar. Consulte Desarrollo local con conexiones basadas en identidades.

Concesión de permiso a la identidad

Cualquier identidad que se utilice debe tener permisos para realizar las acciones previstas. Para la mayoría de los servicios de Azure, esto significa que debe asignar un rol en Azure RBAC mediante roles integrados o personalizados que proporcionen esos permisos.

Importante

Es posible que el servicio de destino muestre algunos permisos que no son necesarios para todos los contextos. Siempre que sea posible, respete el principio de privilegios mínimos y conceda solo los privilegios necesarios a la identidad. Por ejemplo, si la aplicación solo necesita poder leer desde un origen de datos, use un rol que solo tenga permiso de lectura. Sería inadecuado asignar un rol que también permita escribir en ese servicio, ya que sería un permiso excesivo para una operación de lectura. De forma similar, le interesa asegurarse de que la asignación de roles esté limitada solo a los recursos que se deben leer.

Deberá crear una asignación de roles que proporcione acceso al centro de eventos en tiempo de ejecución. El ámbito de la asignación de roles puede ser para un espacio de nombres de Event Hubs o para el propio centro de eventos. Los roles de administración, como Propietario, no son suficientes. En la tabla siguiente se muestran los roles integrados que se recomiendan al usar la extensión de Event Hubs con un funcionamiento normal. Puede que la aplicación precise permisos adicionales en función del código que escriba.

Tipo de enlace Roles integrados de ejemplo
Desencadenador Receptor de los datos de Azure Event Hubs, Propietario de los datos de Azure Event Hubs
Enlace de salida Emisor de datos de Azure Event Hubs

configuración de host.json

El archivo host.json contiene opciones de configuración que controlan el comportamiento de Event Hubs. Consulte la sección de configuración de host.json para más información sobre las opciones de configuración disponibles.

Pasos siguientes