Como ingerir dados usando o Azure Stream Analytics no Azure Cosmos DB for PostgreSQL

APLICA-SE A: Azure Cosmos DB for PostgreSQL (da plataforma da extensão de dados Citus para PostgreSQL)

O Azure Stream Analytics é um mecanismo de processamento de eventos e análise em tempo real desenvolvido para processar altos volumes de dados de streaming rápidos a partir de dispositivos, sensores e sites da Web. Ele também está disponível no runtime do Azure IoT Edge, habilitando o processamento de dados em dispositivos IoT.

Diagrama que mostra a arquitetura do Stream Analytics com o Azure Cosmos DB for PostgreSQL.

O Azure Cosmos DB for PostgreSQL se destaca em cargas de trabalho em tempo real, como IoT. Para essas cargas de trabalho, o Stream Analytics pode atuar como alternativa sem código, com desempenho e escalonável para pré-processar e transmitir dados dos Hubs de Eventos do Azure, do Hub IoT do Azure e do Armazenamento de Blobs do Azure para o Azure Cosmos DB for PostgreSQL.

Etapas para configurar o Stream Analytics

Observação

Este artigo usa o Hub IoT do Azure como uma fonte de dados de exemplo, mas a técnica é aplicável a qualquer outra fonte com suporte do Stream Analytics. Além disso, os dados de demonstração a seguir são provenientes do Simulador de telemetria de dispositivo de Internet das Coisas do Azure. Este artigo não aborda a configuração do simulador.

  1. No portal do Azure, expanda o menu do portal no canto superior esquerdo e selecione Criar um recurso.

  2. Selecione Analytics>Trabalho do Stream Analytics na lista de resultados.

  3. Preencha a página Novo trabalho do Stream Analytics com as seguintes informações:

    • Assinatura - Selecione a assinatura do Azure que deseja usar para este trabalho.
    • Grupo de recursos – Selecione o mesmo grupo de recursos do hub IoT.
    • Nome – Insira um nome para identificar o trabalho do Stream Analytics.
    • Região – Selecione a região do Azure para hospedar o trabalho do Stream Analytics. Use a localização geográfica mais próxima dos usuários para melhorar o desempenho e reduzir o custo de transferência dos dados.
    • Ambiente de hospedagem – Selecione Nuvem para implantar na nuvem do Azure ou Edge para implantar em um dispositivo de IoT Edge.
    • Unidades de streaming – Selecione o número de unidades de streaming para os recursos de computação necessários para executar o trabalho.
  4. Selecione Examinar + Criar e, em seguida, selecione Criar. Você deve ver uma notificação de Implantação em andamento no canto superior direito.

    Captura de tela que mostra o formulário Criar trabalho do Stream Analytics.

  5. Configure a entrada de trabalho.

    Captura de tela que mostra como configurar a entrada de trabalho no Stream Analytics.

    1. Depois que a implantação do recurso for concluída, navegue até o trabalho do Stream Analytics. Selecione Entradas>Adicionar entrada de fluxo>Hub IoT.

    2. Preencha a página Hub IoT com os seguintes valores:

      • Alias de entrada – Insira um nome para identificar a entrada de trabalho.
      • Assinatura – Selecione a assinatura do Azure que tem a conta do Hub IoT.
      • Hub IoT – Selecione o nome do hub IoT.
    3. Clique em Salvar.

    4. Depois que o fluxo de entrada for adicionado, você também poderá verificar ou baixar o conjunto de dados que está fluindo. O código a seguir mostra os dados de um evento de exemplo:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. Configure a saída de trabalho.

    1. Na página de trabalho do Stream Analytics, selecione Saídas>Adicionar>banco de dados PostgreSQL (versão prévia).

      Captura de tela que mostra como selecionar a saída do banco de dados PostgreSQL.

    2. Preencha a página do PostgreSQL do Azure com os seguintes valores:

      • Alias de saída – Insira um nome para identificar a saída de trabalho.
      • Selecione Fornecer configurações do banco de dados PostgreSQL manualmente e insira Nome de domínio totalmente qualificado do servidor, Banco de dados, Tabela, Nome de usuário e Senha. No conjunto de dados de exemplo, use a tabela device_data.
    3. Clique em Salvar.

    Configurar a saída do trabalho no Azure Stream Analytics.

  7. Defina a consulta de transformação.

    Consulta de transformação no Azure Stream Analytics.

    1. Na página de trabalho do Stream Analytics, selecione Consulta no menu esquerdo.

    2. Para este tutorial, você ingerirá apenas os eventos alternativos de Hub IoT no Azure Cosmos DB for PostgreSQL, para reduzir o tamanho geral dos dados. Copie e cole a seguinte consulta no painel de consulta:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. Selecione Salvar consulta.

      Observação

      Você usará a consulta não apenas para amostrar os dados, mas também para extrair os atributos desejados do fluxo de dados. A opção de consulta personalizada com o Stream Analytics é útil no pré-processamento/transformação dos dados, antes que sejam ingeridos no banco de dados.

  8. Inicie o trabalho do Stream Analytics e verifique a saída.

    1. Retorne para a página de visão geral do trabalho e selecione Iniciar.

    2. Na página Iniciar trabalho, selecione Agora para Hora de início da saída de trabalho e, em seguida, selecione Iniciar.

    3. O trabalho demora um pouco para começar na primeira vez, mas, uma vez disparado, continua a ser executado, à medida que os dados chegam. Após alguns minutos, você pode consultar o cluster para verificar se os dados foram carregados.

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

Observação

No momento, o recurso Conexão de Teste não é compatível com o Azure Cosmos DB for PostgreSQL e pode gerar um erro, mesmo quando a conexão funciona corretamente.

Próximas etapas

Saiba como criar um painel em tempo real com o Azure Cosmos DB for PostgreSQL.