Tutorial: Executar o Azure Functions a partir dos trabalhos do Azure Stream Analytics

É possível executar o Azure Functions a partir do Azure Stream Analytics configurando o Azure Functions como um dos coletores (saídas) para o trabalho do Stream Analytics. O Azure Functions é uma experiência de computação sob demanda controlada por evento que permite implementar o código que é disparado por eventos que ocorrem no Azure ou em serviços de terceiros. Essa capacidade do Azure Functions de responder a gatilhos faz dele uma saída natural para trabalhos do Stream Analytics.

O Stream Analytics invoca o Azure Functions por meio de gatilhos HTTP. O adaptador de saída do Azure Functions permite que os usuários o conectem ao Stream Analytics, de modo que os eventos possam ser disparados com base em consultas do Stream Analytics.

Observação

Não há suporte para a conexão com o Azure Functions em uma VNet (rede virtual) por meio de um trabalho do Stream Analytics que esteja sendo executado em um cluster multilocatário.

Neste tutorial, você aprenderá como:

  • Criar e executar um trabalho do Stream Analytics
  • Criar uma instância de Cache do Azure para Redis
  • Criar uma Função do Azure
  • Verificar o Cache do Azure para Redis quanto aos resultados

Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.

Configurar um trabalho do Stream Analytics para executar uma função

Esta seção demonstra como configurar um trabalho do Stream Analytics para executar uma função que grava dados no Cache do Azure para Redis. O trabalho do Stream Analytics lê eventos dos Hubs de Eventos do Azure e executa uma consulta que invoca a função. Esta função lê dados do trabalho do Stream Analytics e os grava no Cache do Azure para Redis.

Diagram showing relationships among the Azure services

Pré-requisitos

Antes de começar, verifique se você concluiu as seguintes etapas:

  • Se você não tiver uma assinatura do Azure, crie uma conta gratuita.
  • Faça o download do aplicativo gerador de evento de chamada telefônicaTelcoGenerator.zip do Centro de Download da Microsoft ou obtenha o código-fonte no GitHub.

Entrar no Azure

Entre no portal do Azure.

Criar um Hub de Evento

Você deve enviar os dados de exemplo para um hub de eventos antes que o Stream Analytics possa analisar o fluxo de dados de chamadas fraudulentas. Neste tutorial, você envia dados ao Azure usando o Hubs de Eventos do Azure.

Use as seguintes etapas para criar um hub de eventos e enviar dados de chamada para ele:

  1. Entre no portal do Azure.

  2. Selecione Todos os serviços no menu à esquerda, selecione Internet das coisas, passe o sobre Hubs de Eventos e, em seguida, selecione o botão + (Adicionar).

    Screenshot showing the Event Hubs creation page.

  3. Na página Criar Namespace, siga estas etapas:

    1. Selecione uma assinatura do Azure em que deseja criar o hub de eventos.

    2. Em Grupo de recursos, escolha Criar e insira um nome para o grupo de recursos. O namespace dos Hubs de Eventos é criado neste grupo de recursos.

    3. Em Nome do namespace, insira um nome exclusivo para o namespace dos Hubs de Eventos.

    4. Em Localização, selecione a região na qual deseja criar o namespace.

    5. Em Tipo de preço, selecione Standard.

    6. Selecione Revisar + criar na parte inferior da página.

      Screenshot showing the Create Namespace page.

    7. Na página Revisar + criar do assistente de criação de namespace, selecione Criar na parte inferior da página depois de revisar todas as configurações.

  4. Depois que o namespace for implantado com êxito, selecione Ir para o recurso para navegar até a página Namespace dos Hubs de Eventos.

  5. Na página Namespace de Hubs de Eventos, selecione +Hub de Eventos na barra de comando.

    Screenshot showing the Add event hub button on the Event Hubs Namespace page.

  6. Na página Criar Hub de Eventos, insira um nome para o Hub de Eventos. Defina a Contagem de Partições como 2. Use as opções padrão nas configurações restantes e escolha Revisar + criar.

    Screenshot showing the Create event hub page.

  7. Na página Examinar + criar, selecione Criar na parte inferior da página. Aguarde até que a implantação tenha êxito.

Conceder acesso para o hub de eventos e obter uma cadeia de caracteres de conexão

Antes que um aplicativo possa enviar dados aos Hubs de Eventos do Azure, o hub de eventos deve ter uma política que permita o acesso. A política de acesso produz uma cadeia de conexão que inclui informações de autorização.

  1. Na página Namespace dos Hubs de Eventos, selecione Políticas de acesso compartilhado no menu à esquerda.

  2. Selecione RootManageSharedAccessKey na lista de políticas.

  3. Então, selecione o botão “Copiar” ao lado de Cadeia de conexão – chave primária.

  4. Cole a cadeia de conexão em um editor de texto. Você precisará dessa cadeia de conexão na próxima seção.

    A cadeia de conexão tem esta aparência:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Observe que a cadeia de conexão contém vários pares de chave-valor separados por ponto e vírgula: Endpoint, SharedAccessKeyName e SharedAccessKey.

Iniciar o aplicativo gerador de evento

Antes de iniciar o aplicativo TelcoGenerator, configure-o para enviar dados para os Hubs de Eventos do Azure criados anteriormente.

  1. Extraia o conteúdo do arquivo TelcoGenerator.zip.

  2. Abra o arquivo TelcoGenerator\TelcoGenerator\telcodatagen.exe.config em um editor de texto de sua escolha Há mais de um arquivo .config, portanto verifique se abriu o arquivo correto.

  3. Atualize o elemento <appSettings> no arquivo de configuração com os seguintes detalhes:

    • Defina o valor da chave EventHubName como o valor de EntityPath no final da cadeia de conexão.
    • Defina o valor da chave Microsoft.ServiceBus.ConnectionString para a cadeia de conexão sem o valor de EntityPath (;EntityPath=myeventhub) no final. Não se esqueça de remover o ponto e vírgula que precede o valor de EntityPath.
  4. Salve o arquivo.

  5. Em seguida, abra uma janela de comando e altere para a pasta onde o aplicativo TelcoGenerator foi descompactado. Em seguida, digite o seguinte comando:

    .\telcodatagen.exe 1000 0.2 2
    

    Esse comando usa os seguintes parâmetros:

    • Número de registros de dados de chamadas por hora.
    • Porcentagem de probabilidade de fraude, que é a frequência com que o aplicativo deve simular uma chamada fraudulenta. O valor 0,2 significa que cerca de 20% dos registros de chamada parecem ser fraudulentos.
    • Duração em horas, que é o número de horas em que o aplicativo deve ser executado. Também é possível interromper o aplicativo a qualquer momento encerrando o processo (Ctrl+C) na linha de comando.

    Depois de alguns segundos, o aplicativo é iniciado exibindo registros de chamada telefônica na tela, enquanto envia para o hub de eventos. Os dados de chamadas telefônicas contêm os seguintes campos:

    Registro Definição
    CallrecTime Carimbo de data/hora para a hora de início da chamada.
    SwitchNum Chave do telefone usada para se conectar à chamada. Neste exemplo, as opções são cadeias de caracteres que representam o país/região de origem (Estados Unidos, China, Reino Unido, Alemanha ou Austrália).
    CallingNum Número de telefone do autor da chamada.
    CallingIMSI A Identidade do Assinante Móvel Internacional (IMSI). É um identificador exclusivo do autor da chamada.
    CalledNum O número de telefone do destinatário da chamada.
    CalledIMSI Identidade do Assinante Móvel Internacional (IMSI). É um identificador exclusivo do destinatário da chamada.

Criar um trabalho de Stream Analytics

Agora que você tem um fluxo de eventos de chamada, pode criar um trabalho do Stream Analytics que lê dados do hub de eventos.

  1. Para criar um trabalho do Stream Analytics, navegue até o portal do Azure.
  2. Clique em Criar um recurso e pesquise um trabalho do Stream Analytics. Selecione o bloco Trabalho do Stream Analytics e selecione Criar.
  3. Na página Novo trabalho do Stream Analytics, siga estas etapas:
    1. Em Assinatura, selecione a assinatura que contém o namespace dos Hubs de Eventos.

    2. Em Grupo de recursos, escolha o grupo de recursos criados anteriormente.

    3. Na seção Detalhes da instância, em Nome, insira um nome exclusivo para o trabalho do Stream Analytics.

    4. Em Região, selecione a região na qual deseja criar o trabalho do Stream Analytics. Recomendamos colocar o trabalho e o hub de eventos na mesma região para melhor desempenho e para que você não precise pagar para transferir dados entre regiões.

    5. Em Ambiente de hospedagem<, selecione Nuvem, caso ainda não esteja selecionado. Os trabalhos do Stream Analytics podem ser implantados na nuvem ou na borda. O Cloud permite que você implante no Azure Cloud, e o Edge permite que você implante em um dispositivo IoT Edge.

    6. Em Unidades de streaming, selecione 1. As unidades de streaming representam os recursos de computação necessários para executar um trabalho. Por padrão, esse valor é definido como 1. Para saber mais sobre como dimensionar unidades de streaming, confira o artigo Entendendo e ajustando as unidades de streaming.

    7. Selecione Revisar + criar na parte inferior da página.

      Screenshot that shows the Create Azure Stream Analytics job page.

  4. Na página Revisar + criar, revise as configurações e selecione Criar para criar um trabalho do Stream Analytics.
  5. Depois que o trabalho for implantado, selecione Acessar recurso para navegar até a página do trabalho do Stream Analytics.

Configurar entrada de trabalho

A próxima etapa é definir uma fonte de entrada para o trabalho ler os dados usando o hub de eventos criado na seção anterior.

  1. Na página do trabalho do Stream Analytics, na seção Topologia do Trabalho no menu à esquerda, selecione Entradas.

  2. Na página Entradas, selecione +Adicionar entrada e Hub de eventos.

    Screenshot showing the Input page for a Stream Analytics job.

  3. Na página Hub de eventos, siga estas etapas:

    1. Em Alias de entrada, insira CallStream. O alias de entrada é um nome amigável para identificar a entrada. O alias de entrada pode conter somente caracteres alfanuméricos, hifens e sublinhados e deve ter entre 3 e 63 caracteres.

    2. Em Assinatura, selecione a assinatura do Azure em que você criou o hub de eventos. O hub de eventos pode estar na mesma assinatura ou em uma diferente da do trabalho do Stream Analytics.

    3. Em Namespace dos Hubs de Eventos, selecione o namespace dos Hubs de Eventos criado na seção anterior. Todos os namespaces disponíveis na sua assinatura atual são listados no menu suspenso.

    4. Em Nome do hub de eventos, selecione o hub de eventos criado na seção anterior. Todos os hubs de eventos disponíveis no namespace selecionado são listados no menu suspenso.

    5. Em Grupo de consumidores do hub de eventos, mantenha a opção Criar selecionada para que um grupo de consumidores seja criado no hub de eventos. É recomendável usar um grupo de consumidores distinto para cada trabalho do Stream Analytics. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de consumidores $Default. Quando um trabalho contém uma autojunção ou várias entradas, no futuro, algumas entradas podem ser lidas por mais de um leitor. Essa situação afeta o número de leitores em um único grupo de consumidores.

    6. Em Modo de autenticação, selecione Cadeia de conexão. É mais fácil testar o tutorial com essa opção.

    7. Em Nome da política do hub de eventos, selecione Usar existente e escolha a política criada anteriormente.

    8. Escolha Salvar na parte inferior da página.

      Screenshot showing the Event Hubs configuration page for an input.

Criar uma instância de Cache do Azure para Redis

  1. Crie um cache no Cache do Azure para Redis usando as etapas descritas em Criar um cache.

  2. Depois de criar o cache, em Configurações, selecione Chaves de Acesso. Anote a Chave da cadeia de conexão primária.

    Screenshot showing the selection of the Access Key menu item.

Criar uma função no Azure Functions que pode gravar dados no Cache do Azure para Redis

  1. Consulte a seção Criar um aplicativo de funções da documentação do Azure Functions. Este exemplo foi criado em:

  2. Crie um aplicativo de funções HttpTrigger padrão no Visual Studio Code seguindo este tutorial. As seguintes informações são usadas: idioma: C#, runtime: .NET 6 (na função v4), modelo: HTTP trigger.

  3. Instale a biblioteca de clientes do Redis executando o seguinte comando em um terminal localizado na pasta do projeto:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Adicione os itens RedisConnectionString e RedisDatabaseIndex na seção Values do local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    O Índice do Banco de Dados Redis é o número de 0 a 15 que identifica o banco de dados na instância.

  5. Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    Quando o Stream Analytics recebe a exceção "Entidade de Solicitação HTTP muito grande" da função, ele reduz o tamanho dos lotes que envia para as funções. O código a seguir garante que o Stream Analytics não envia lotes muito grandes. Certifique-se de que os valores de contagem e tamanho máximo do lote usados na função sejam consistentes com os valores inseridos no portal do Stream Analytics.

  6. A função agora pode ser publicada no Azure.

  7. Abra a função no portal do Azure e defina as configurações do aplicativo para RedisConnectionString e RedisDatabaseIndex.

Atualizar o trabalho do Stream Analytics com a função como saída

  1. Abra seu trabalho do Stream Analytics no portal do Azure.

  2. Navegue até a função e selecione Visão Geral>Saídas>Adicionar. Para adicionar uma nova saída, selecione Azure Functions como a opção de coletor. O adaptador de saída do Functions tem as seguintes propriedades:

    Nome da propriedade Descrição
    Alias de saída Um nome amigável que você usa na consulta do trabalho para fazer referência à saída.
    Importar opção Você poderá usar a função da assinatura atual ou fornecer as configurações manualmente se a função estiver localizada em outra assinatura.
    Aplicativo de Funções Nome de seu aplicativo de funções.
    Função Nome da função em seu aplicativo de funções (nome da função run.csx).
    Tamanho Máximo do Lote Define o tamanho máximo de cada lote de saída, que é enviado para a função em bytes. Por padrão, esse valor é definido como 262.144 bytes (256 KB).
    Contagem Máxima do Lote Especifica o número máximo de eventos em cada lote enviado para a função. O valor padrão é 100. Essa propriedade é opcional.
    Chave Permite que você use uma função de outra assinatura. Forneça o valor da chave para acessar sua função. Essa propriedade é opcional.
  3. Forneça um nome para o alias de saída. Neste tutorial, ele é nomeado saop1, mas você pode usar qualquer nome de sua escolha. Preencha outros detalhes.

  4. Abra seu trabalho do Stream Analytics e atualize a consulta para o seguinte.

    Importante

    Se não nomear o coletor de saída como saop1, lembre-se alterá-lo na consulta.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Inicie o aplicativo telcodatagen.exe executando o seguinte comando na linha de comando. O comando usa o formato telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Inicie o trabalho do Stream Analytics.

  7. Na página Monitorar para sua função do Azure, você visualizará que a função é invocada.

    Screenshot showing the Monitor page for Azure Functions with function invocations.

  8. Na página Cache do Azure para Redis, selecione Métricas no menu à esquerda, adicione a métrica Gravação de Cache e defina a duração como última hora. Será visualizado o gráfico semelhante à imagem a seguir.

    Screenshot showing the Metrics page for your Azure Cache for Redis.

Verificar o Cache do Azure para Redis quanto aos resultados

Obter a chave de logs do Azure Functions

Primeiro, obtenha a chave de um registro inserido em Cache do Azure para Redis. No código, a chave é calculada na função do Azure, conforme mostrado no seguinte snippet de código:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Navegue até o portal do Azure e localize o aplicativo Azure Functions.

  2. Selecione Funções no menu esquerdo.

  3. Selecione HTTPTrigger1 na lista de funções.

  4. Selecione Monitor no menu à esquerda.

  5. Alterne para a guia Logs.

  6. Anote uma chave da mensagem informativa, conforme mostrado na captura de tela a seguir. Use essa chave para localizar o valor em Cache do Azure para Redis.

    Screenshot showing the Monitor Logs page for the Azure function.

Use a chave para localizar o registro em Cache do Azure para Redis

  1. Navegue até o portal do Azure e localize seu Cache do Azure para Redis. Selecione Console.

  2. Use Comandos do Cache do Azure para Redis para verificar se seus dados estão no Cache do Azure para Redis. (O comando usa o formato Get {key}.) Use a chave copiada dos logs do Monitor para a função do Azure (na seção anterior).

    Get "KEY-FROM-THE-PREVIOUS-SECTION"

    Este comando deve imprimir o valor para a chave especificada:

    Screenshot showing the Redis Cache console showing the output of the Get command.

Tratamento de erros e novas tentativas

Se ocorrer uma falha durante o envio de eventos para o Azure Functions, o Stream Analytics repetirá a maioria das operações. Todas as exceções http serão repetidas até serem bem-sucedidas, exceto para o erro http 413 (entidade grande demais). Um erro de entidade grande demais é tratado como um erro de dados que está sujeito à política de repetição ou remoção.

Observação

O tempo limite para solicitações HTTP do Stream Analytics para Azure Functions é definido como 100 segundos. Se o aplicativo do Azure Functions levar mais de 100 segundos para processar um lote, o Stream Analytics apresentará erro e tentará executar o lote novamente.

A repetição de tentativas após atingir o tempo limite pode levar à gravação de eventos duplicados no coletor de saída. Quando o Stream Analytics tenta executar novamente um lote com falha, ele tenta todos os eventos no lote. Por exemplo, considere um lote de 20 eventos que são enviados para o Azure Functions do Stream Analytics. Suponha que o Azure Functions leve 100 segundos para processar os 10 primeiros eventos nesse lote. Após os 100 segundos, o Stream Analytics suspende a solicitação, uma vez que não recebeu uma resposta positiva do Azure Functions, e outra solicitação é enviada para o mesmo lote. Os 10 primeiros eventos no lote são processados novamente pelo Azure Functions, o que causa uma duplicata.

Problemas conhecidos

No portal do Azure, quando você tenta redefinir o valor de Tamanho máximo do lote/Contagem máxima de lotes como vazio (padrão), o valor é alterado para o valor inserido anteriormente ao salvar. Nesse caso, insira manualmente os valores padrão para esses campos.

Atualmente, o uso de roteiros HTTP em seu Azure Functions não é compatível com o Stream Analytics.

O suporte para se conectar ao Azure Functions hospedado em uma rede virtual não está habilitado.

Limpar os recursos

Quando não forem mais necessários, exclua o grupo de recursos, o trabalho de streaming e todos os recursos relacionados. A exclusão do trabalho evita a cobrança das unidades de streaming consumidas por ele. Se você está planejando usar o trabalho no futuro, pode interrompê-lo e reiniciar mais tarde, quando necessário. Se você não pretende continuar usando este trabalho, exclua todos os recursos criados por este início rápido seguindo estas etapas:

  1. No menu à esquerda no Portal do Azure, selecione Grupos de recursos e selecione o nome do recurso criado.
  2. Em sua página de grupo de recursos, selecione Excluir, digite o nome do recurso para excluir na caixa de texto e selecione Excluir.

Próximas etapas

Neste tutorial, você criou um trabalho simples do Stream Analytics que executa uma função do Azure. Para saber mais sobre trabalhos do Stream Analytics, prossiga para o seguinte tutorial: