Tutorial: Executar o Azure Functions a partir de trabalhos do Azure Stream Analytics

Neste tutorial, você cria um trabalho do Azure Stream Analytics que lê eventos dos Hubs de Eventos do Azure, executa uma consulta nos dados do evento e invoca uma função do Azure, que grava em uma instância do Cache do Azure para Redis.

Captura de tela que mostra a relação entre os serviços do Azure na solução.

Nota

  • Você pode executar o Azure Functions a partir do Azure Stream Analytics configurando o Functions como um dos coletores (saídas) para o trabalho do Stream Analytics. As Funções são uma experiência de cálculo a pedido orientada por eventos que lhe permite implementar o código que é acionado pelos eventos que ocorrem no Azure ou em serviços de terceiros. Esta capacidade que as Funções têm de responder a acionadores torna-as numa saída natural para as tarefas do Stream Analytics.
  • O Stream Analytics invoca Funções através de acionadores HTTP. O adaptador de saída das Funções permite aos utilizadores ligarem Funções ao Stream Analytics, para que os eventos possam ser acionados com base em consultas do Stream Analytics.
  • Não há suporte para conexão com o Azure Functions dentro de uma rede virtual (VNet) de um trabalho do Stream Analytics em execução em um cluster multilocatário.

Neste tutorial, irá aprender a:

  • Criar uma instância dos Hubs de Eventos do Azure
  • Criar uma instância do Cache do Azure para Redis
  • Criar uma Função do Azure
  • Criar uma tarefa do Stream Analytics
  • Configurar o hub de eventos como entrada e função como saída
  • Executar a tarefa do Stream Analytics
  • Verifique os resultados do Cache Redis do Azure

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

Antes de começar, certifique-se de que concluiu os seguintes passos:

Iniciar sessão no Azure

Inicie sessão no portal do Azure.

Criar um hub de eventos

Você precisa enviar alguns dados de exemplo para um hub de eventos antes que o Stream Analytics possa analisar o fluxo de dados de chamadas fraudulentas. Neste tutorial, você envia dados para o Azure usando os Hubs de Eventos do Azure.

Use as seguintes etapas para criar um hub de eventos e enviar dados de chamada para esse hub de eventos:

  1. Inicie sessão no portal do Azure.

  2. Selecione Todos os serviços no menu à esquerda, selecione Internet das coisas, passe o mouse sobre Hubs de Eventos e selecione o botão + (Adicionar ).

    Captura de tela mostrando a página de criação de Hubs de Eventos.

  3. Na página Criar namespace, execute estas etapas:

    1. Selecione uma assinatura do Azure onde você deseja criar o hub de eventos.

    2. Em Grupo de recursos, selecione Criar novo e insira um nome para o grupo de recursos. O namespace Hubs de Eventos é criado neste grupo de recursos.

    3. Em Nome do namespace, insira um nome exclusivo para o namespace Hubs de Eventos.

    4. Em Local, selecione a região na qual você deseja criar o namespace.

    5. Em Nível de preço, selecione Padrão.

    6. Selecione Rever + criar na parte inferior da página.

      Captura de tela mostrando a página Criar namespace.

    7. Na página Revisão + criação do assistente de criação de namespace, selecione Criar na parte inferior da página depois de revisar todas as configurações.

  4. Depois que o namespace for implantado com êxito, selecione Ir para o recurso para navegar até a página Namespace dos Hubs de Eventos .

  5. Na página Namespace de Hubs de Eventos , selecione +Hub de Eventos na barra de comandos.

    Captura de tela mostrando o botão Adicionar hub de eventos na página Namespace de Hubs de Eventos.

  6. Na página Criar Hub de Eventos, insira um Nome para o hub de eventos. Defina a contagem de partições como 2. Use as opções padrão nas configurações restantes e selecione Revisar + criar.

    Captura de tela mostrando a página Criar hub de eventos.

  7. Na página Rever + criar, selecione Criar na parte inferior da página. Em seguida, aguarde até que a implementação seja executada com êxito.

Conceder acesso ao hub de eventos e obter uma cadeia de ligação

Antes que um aplicativo possa enviar dados para os Hubs de Eventos do Azure, o hub de eventos deve ter uma política que permita o acesso. A política de acesso produz uma cadeia de ligação que inclui as informações de autorização.

  1. Na página Namespace de Hubs de Eventos , selecione Políticas de acesso compartilhado no menu à esquerda.

  2. Selecione RootManageSharedAccessKey na lista de políticas.

  3. Em seguida, selecione o botão de cópia ao lado de Cadeia de conexão - chave primária.

  4. Cole a cadeia de ligação num editor de texto. Vai precisar desta cadeia de ligação na secção seguinte.

    A cadeia de ligação é semelhante à seguinte:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Observe que a cadeia de conexão contém vários pares chave-valor separados por ponto-e-vírgula: Endpoint, SharedAccessKeyName e SharedAccessKey.

Iniciar a aplicação geradora de eventos

Antes de iniciar a aplicação TelcoGenerator, deve configurá-la para enviar dados para os Hubs de Eventos do Azure que criou anteriormente.

  1. Extraia o conteúdo do ficheiro TelcoGenerator.zip.

  2. Abra o TelcoGenerator\TelcoGenerator\telcodatagen.exe.config arquivo em um editor de texto de sua escolha Há mais de um .config arquivo, então certifique-se de abrir o correto.

  3. Atualize o elemento <appSettings> no ficheiro de configuração com os seguintes detalhes:

    • Defina o valor da chave EventHubName como o valor do EntityPath no final da cadeia de conexão.
    • Defina o valor da chave Microsoft.ServiceBus.ConnectionString para a cadeia de conexão para o namespace. Se você usar uma cadeia de conexão para um hub de eventos, não um namespace, remova EntityPath o valor (;EntityPath=myeventhub) no final. Não se esqueça de remover o ponto-e-vírgula que precede o valor EntityPath.
  4. Guarde o ficheiro.

  5. Em seguida, abra uma janela de comando e mude para a pasta onde deszipou a aplicação TelcoGenerator. Em seguida, introduza o seguinte comando:

    .\telcodatagen.exe 1000 0.2 2
    

    Este comando recebe os seguintes parâmetros:

    • Número de registos de dados de chamada por hora.
    • Percentagem de probabilidade de fraude, que corresponde à frequência com que a aplicação deve simular uma chamada fraudulenta. O valor 0,2 significa que cerca de 20% dos registos de chamadas parecem fraudulentos.
    • Duração em horas, que corresponde ao número de horas que a aplicação deve ser executada. Você também pode parar o aplicativo a qualquer momento, encerrando o processo (Ctrl+C) na linha de comando.

    Após alguns segundos, a aplicação começa a apresentar registos de chamadas telefónicas no ecrã, à medida que os envia para o hub de eventos. Os dados das chamadas telefónicas contêm os seguintes campos:

    Registo Definição
    CallrecTime O carimbo de data/hora da hora de início da chamada.
    SwitchNum O comutador de telefone utilizado para estabelecer a chamada. Neste exemplo, as opções são cadeias de caracteres que representam o país/região de origem (EUA, China, Reino Unido, Alemanha ou Austrália).
    CallingNum O número de telefone do chamador.
    CallingIMSI A Identidade Internacional de Assinante Móvel (IMSI). É um identificador exclusivo do chamador.
    CalledNum O número de telefone do destinatário da chamada.
    CalledIMSI Identidade Internacional de Assinante Móvel (IMSI). É um identificador exclusivo do destinatário da chama.

Criar uma tarefa do Stream Analytics

Agora que tem um fluxo de eventos de chamada, pode criar uma tarefa do Stream Analytics que lê os dados do hub de eventos.

  1. Para criar uma tarefa do Stream Analytics, navegue até ao portal do Azure.
  2. Selecione Criar um recurso e procure um trabalho do Stream Analytics. Selecione o bloco de trabalho do Stream Analytics e selecione Criar.
  3. Na página de trabalho Novo Stream Analytics , siga estas etapas:
    1. Em Assinatura, selecione a assinatura que contém o namespace Hubs de Eventos.

    2. Em Grupo de recursos, selecione o grupo de recursos criado anteriormente.

    3. Na seção Detalhes da instância, Para Nome, insira um nome exclusivo para o trabalho do Stream Analytics.

    4. Em Região, selecione a região na qual você deseja criar o trabalho do Stream Analytics. Recomendamos que você coloque o trabalho e o hub de eventos na mesma região para obter o melhor desempenho e para não pagar para transferir dados entre regiões.

    5. Em Ambiente de hospedagem< , selecione Nuvem se ainda não estiver selecionado. As tarefas do Stream Analytics podem ser implementadas na cloud ou no Edge. A nuvem permite que você implante na Nuvem do Azure e o Edge permite que você implante em um dispositivo IoT Edge.

    6. Em Unidades de streaming, selecione 1. As unidades de transmissão em fluxo representam os recursos informáticos que são necessários para executar uma tarefa. Por predefinição, este valor está definido como 1. Para saber mais sobre o dimensionamento de unidades de transmissão em fluxo, veja o artigo Compreender e ajustar as unidades de transmissão em fluxo.

    7. Selecione Rever + criar na parte inferior da página.

      Captura de tela que mostra a página de trabalho Criar o Azure Stream Analytics.

  4. Na página Rever + criar, reveja as definições e, em seguida, selecione Criar para criar o trabalho do Stream Analytics.
  5. Depois que o trabalho for implantado, selecione Ir para o recurso para navegar até a página de trabalho do Stream Analytics.

Configurar a entrada da tarefa

A próxima etapa consiste em definir uma origem de entrada para a tarefa ler dados com o hub de eventos que criou na secção anterior.

  1. Na página de trabalho do Stream Analytics, na seção Topologia de trabalho no menu à esquerda, selecione Entradas.

  2. Na página Entradas, selecione + Adicionar entrada e Hub de eventos.

    Captura de tela mostrando a página Entrada para um trabalho do Stream Analytics.

  3. Na página Hub de eventos, siga estas etapas:

    1. Em Alias de entrada, digite CallStream. Alias de entrada é um nome amigável para identificar sua entrada. O alias de entrada só pode conter carateres alfanuméricos, hífenes e carateres de sublinhado, e tem de ter entre 3 e 63 carateres.

    2. Em Assinatura, selecione a assinatura do Azure onde você criou o hub de eventos. O hub de eventos pode estar na mesma subscrição ou numa subscrição diferente da tarefa do Stream Analytics.

    3. Para namespace Hubs de Eventos, selecione o namespace Hubs de Eventos que você criou na seção anterior. Todos os namespaces disponíveis em sua assinatura atual estão listados na lista suspensa.

    4. Em Nome do hub de eventos, selecione o hub de eventos criado na seção anterior. Todos os hubs de eventos disponíveis no namespace selecionado são listados na lista suspensa.

    5. Para o grupo de consumidores do hub de eventos, mantenha a opção Criar novo selecionada para que um novo grupo de consumidores seja criado no hub de eventos. Recomendamos que você use um grupo de consumidores distinto para cada trabalho do Stream Analytics. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de $Default consumidores. Quando um trabalho contém uma associação automática ou tem várias entradas, algumas entradas posteriores podem ser lidas por mais de um leitor. Esta situação afeta o número de leitores num único grupo de consumidores.

    6. Em Modo de autenticação, selecione Cadeia de conexão. É mais fácil testar o tutorial com esta opção.

    7. Para Nome da política do hub de eventos, selecione Usar existente e selecione a política criada anteriormente.

    8. Selecione Salvar na parte inferior da página.

      Captura de tela mostrando a página de configuração dos Hubs de Eventos para uma entrada.

Criar uma instância do Cache do Azure para Redis

  1. Crie um cache no Cache Redis do Azure usando as etapas descritas em Criar um cache.

  2. Depois de criar a cache em Definições, selecione Chaves de Acesso. Anote a Cadeia de ligação primária.

    Captura de tela mostrando a seleção do item de menu Chave de Acesso.

Criar uma função no Azure Functions que possa gravar dados no Cache Redis do Azure

  1. Veja a secção Criar uma aplicação de funções na documentação das Funções. Esta amostra baseou-se em:

  2. Crie um aplicativo de função HttpTrigger padrão no Visual Studio Code seguindo este tutorial. As seguintes informações são usadas: linguagem: C#, tempo de execução: .NET 6 (na função v4), modelo: HTTP trigger.

  3. Instale a biblioteca de cliente Redis executando o seguinte comando em um terminal localizado na pasta do projeto:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Adicione os RedisConnectionString itens e RedisDatabaseIndex na Values seção do seu local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    O Índice do Banco de Dados Redis é o número de 0 a 15 que identifica o banco de dados na instância.

  5. Substitua toda a função (arquivo .cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    Quando o Stream Analytics recebe a exceção "HTTP Request Entity Too Large" da função, ele reduz o tamanho dos lotes que envia para as funções. O código a seguir garante que o Stream Analytics não envie lotes superdimensionados. Certifique-se de que os valores de contagem e tamanho de lote máximo utilizados na função são consistentes com os valores introduzidos no portal do Stream Analytics.

  6. A função agora pode ser publicada no Azure.

  7. Abra a função no portal do Azure e defina as configurações do aplicativo para RedisConnectionString e RedisDatabaseIndex.

Atualizar a tarefa do Stream Analytics com a função como saída

  1. Abra a sua tarefa do Stream Analytics no portal do Azure.

  2. Navegue para a sua função e selecione Descrição Geral>Saídas>Adicionar. Para adicionar uma nova saída, selecione Função do Azure para a opção de sink. O adaptador de saída Functions tem as seguintes propriedades:

    Nome da propriedade Descrição
    Alias de saída Um nome de utilizador amigável que utiliza na consulta da tarefa para a referência de saída.
    Opção de importação Pode utilizar a função da subscrição atual ou fornecer as definições manualmente se a função estiver localizada noutra subscrição.
    Aplicação de Funções Nome da sua aplicação de Funções.
    Function Nome da função na sua Aplicação de funções (nome da sua função run.csx).
    Tamanho Máx. de Lote Define o tamanho máximo para cada lote de saída, que é enviado para sua função em bytes. Por padrão, esse valor é definido como 262.144 bytes (256 KB).
    Contagem Máx. de Lotes Especifica o número máximo de eventos em cada lote que é enviado para a função. O valor predefinido é 100. Esta propriedade é opcional.
    Chave Permite-lhe utilizar uma função a partir de outra subscrição. Indique o valor da chave para aceder à sua função. Esta propriedade é opcional.
  3. Indique um nome para o alias de saída. Neste tutorial, ele é chamado saop1, mas você pode usar qualquer nome de sua escolha. Preencha outros detalhes.

  4. Abra a sua tarefa do Stream Analytics e atualize a consulta para o seguinte.

    Importante

    O script de exemplo a seguir pressupõe que você usou CallStream para o nome de entrada e saop1 para o nome de saída. Se você usou nomes diferentes, NÃO se esqueça de atualizar a consulta.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Inicie o aplicativo telcodatagen.exe executando o seguinte comando na linha de comando. O comando usa o formato telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Inicie a tarefa do Stream Analytics.

  7. Na página Monitor da sua função do Azure, você vê que a função é invocada.

    Captura de ecrã a mostrar a página Monitor do Azure Functions com invocações de funções.

  8. Na página Cache do Azure para Redis , seu cache, selecione Métricas no menu à esquerda, adicione a métrica de Gravação de Cache e defina a duração como última hora. Você vê o gráfico semelhante à imagem a seguir.

    Captura de ecrã a mostrar a página Métricas para a sua Cache do Azure para Redis.

Verifique os resultados do Cache Redis do Azure

Obter a chave dos logs do Azure Functions

Primeiro, obtenha a chave de um registro inserido no Cache do Azure para Redis. No código, a chave é calculada na função do Azure, conforme mostrado no seguinte trecho de código:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Navegue até o portal do Azure e localize seu aplicativo Azure Functions.

  2. Selecione Funções no menu à esquerda.

  3. Selecione HTTPTrigger1 na lista de funções.

  4. Selecione Monitor no menu à esquerda.

  5. Alterne para a guia Logs .

  6. Anote uma chave da mensagem informativa, conforme mostrado na captura de tela a seguir. Use essa chave para localizar o valor no Cache do Azure para Redis.

    Captura de ecrã a mostrar a página Registos do Monitor para a função do Azure.

Use a chave para localizar o registro no Cache Redis do Azure

  1. Navegue até o portal do Azure e localize seu Cache do Azure para Redis. Selecione Consola.

  2. Use os comandos do Cache do Azure para Redis para verificar se seus dados estão no Cache do Azure para Redis. (O comando usa o formato Get {key}.) Use a chave copiada dos logs do Monitor para a função do Azure (na seção anterior).

    Obter "KEY-FROM-THE-PREVIOUS-SECTION"

    Este comando deve imprimir o valor para a chave especificada:

    Captura de tela mostrando o console do Cache Redis mostrando a saída do comando Get.

Processamento de erros e tentativas

Se ocorrer uma falha ao enviar eventos para o Azure Functions, o Stream Analytics tentará novamente a maioria das operações. Todas as exceções http são repetidas até o sucesso, exceto para o erro http 413 (entidade muito grande). Um erro de entidade muito grande é tratado como um erro de dados que está sujeito à política de repetição ou descarte.

Nota

O tempo limite para solicitações HTTP do Stream Analytics para o Azure Functions é definido como 100 segundos. Se o seu aplicativo do Azure Functions demorar mais de 100 segundos para processar um lote, o Stream Analytics errará e repetirá o lote.

Repetir os tempos limite pode resultar em eventos duplicados gravados no coletor de saída. Quando o Stream Analytics tenta novamente para um lote com falha, ele tenta novamente para todos os eventos no lote. Por exemplo, considere um lote de 20 eventos que são enviados para o Azure Functions a partir do Stream Analytics. Suponha que o Azure Functions leva 100 segundos para processar os primeiros 10 eventos nesse lote. Após 100 segundos, o Stream Analytics suspende a solicitação, pois não recebeu uma resposta positiva do Azure Functions e outra solicitação é enviada para o mesmo lote. Os primeiros 10 eventos no lote são processados novamente pelo Azure Functions, o que causa uma duplicação.

Problemas conhecidos

No portal do Azure, quando tenta repor o Tamanho Máx. de Lote/Contagem Máx. de Lotes para vazio (predefinição), o valor regressa ao valor introduzido anteriormente após guardar. Introduza manualmente os valores predefinidos para estes campos neste caso.

O uso de roteamento HTTP em seu Azure Functions atualmente não é suportado pelo Stream Analytics.

O suporte para se conectar ao Azure Functions hospedado em uma rede virtual não está habilitado.

Clean up resources (Limpar recursos)

Quando já não for necessário, elimine o grupo de recursos, a tarefa de transmissão em fluxo e todos os recursos relacionados. A eliminação da tarefa evita a faturação das unidades de transmissão em fluxo consumidas pela tarefa. Se estiver a planear utilizar a tarefa no futuro, pode pará-la e reiniciá-la mais tarde, quando for necessário. Se você não vai continuar a usar esse trabalho, exclua todos os recursos criados por este início rápido usando as seguintes etapas:

  1. No menu do lado esquerdo no portal do Azure, selecione Grupos de recursos e, em seguida, selecione o nome do recurso que criou.
  2. Na página do grupo de recursos, selecione Eliminar, escreva o nome do recurso a eliminar na caixa de texto e, em seguida, selecione Eliminar.

Próximos passos

Neste tutorial, você criou um trabalho simples do Stream Analytics que executa uma Função do Azure. Para saber mais sobre tarefas do Stream Analytics, avance para o próximo tutorial: