Agregar dados em um pipeline de processador de dados
Importante
O recurso Pré-visualização de Operações do Azure IoT — habilitado pelo Azure Arc — está atualmente em VERSÃO PRÉVIA. Você não deve usar esse software em versão prévia em ambientes de produção.
Você precisará implantar uma nova instalação do Azure IoT Operations quando uma versão geralmente disponível for disponibilizada, você não poderá atualizar uma instalação de visualização.
Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral.
O estágio de agregação é um estágio de pipeline opcional, configurável e intermediário que permite executar operações de envio em lote e redução de amostragem em streaming de dados do sensor em janelas de tempo definidas pelo usuário.
Use um estágio de agregação para acumular mensagens em uma janela definida e calcular valores de agregação de propriedades nas mensagens. O estágio emite os valores agregados como propriedades em uma única mensagem no final de cada janela de tempo.
- Cada partição de pipeline realiza a agregação independentemente uma da outra.
- A saída do estágio é uma única mensagem que contém todas as propriedades de agregação definidas.
- O estágio descarta todas as outras propriedades. No entanto, você pode usar as funções Last, First ou Collect para preservar propriedades que, de outra forma, seriam descartadas pelo estágio durante a agregação.
- Para que o estágio de agregação funcione, o estágio da fonte de dados no pipeline deve desserializar a mensagem de entrada.
Pré-requisitos
Para configurar e usar um estágio de pipeline agregado, você precisa de uma instância implantada do processador de dados que inclua o componente opcional do processador de dados.
Configurar o estágio
A configuração JSON do estágio agregado define os detalhes do estágio. Para criar a fase, você pode interagir com a interface do usuário baseada em formulários ou fornecer a configuração JSON na guia Avançado:
Campo | Type | Descrição | Obrigatório | Padrão | Exemplo |
---|---|---|---|---|---|
Nome | String | Um nome a ser mostrado na interface do usuário do processador de dados. | Sim | - | Calculate Aggregate |
Descrição | Cadeia de caracteres | Uma descrição amigável do que o estágio de agregação faz. | Não | Aggregation over temperature |
|
Janela de tempo | A Duração que especifica o período durante o qual a agregação é executada. | Sim | - | 10s |
|
Propriedades > Função | Enumeração | A função de agregação a ser usada. | Sim | - | Sum |
Propriedades > InputPath1 | Caminho | O Caminho para a propriedade na mensagem de entrada à qual aplicar a função. | Sim | - | .payload.temperature |
Propriedades > OutputPath2 | Caminho | O Caminho para o local na mensagem de saída para colocar o resultado. | Sim | - | .payload.temperature.average |
Você pode definir várias configurações de Propriedades em um estágio de agregação. Por exemplo, calcule a soma da temperatura e calcule a média da pressão.
1Caminho de entrada:
- O tipo de dados do valor da propriedade do caminho de entrada deve ser compatível com o tipo de função definida.
- Você pode fornecer o mesmo caminho de entrada entre várias configurações de agregação para calcular várias funções na mesma propriedade do caminho de entrada. Verifique se os caminhos de saída são diferentes para evitar a substituição dos resultados.
2Caminho de saída:
- Os caminhos de saída podem ser iguais ou diferentes do caminho de entrada. Use caminhos de saída diferentes se você estiver calculando várias agregações na mesma propriedade do caminho de entrada.
- Configure caminhos de saída distintos para evitar a substituição de valores agregados.
Windows
A janela é o intervalo de tempo sobre o qual o estágio acumula mensagens. No final da janela, o estágio aplica a função configurada às propriedades da mensagem. Em seguida, o estágio emite uma única mensagem.
Atualmente, o estágio só dá suporte a janelas em cascata.
As janelas em cascata são uma série de intervalos de tempo de tamanho fixo, não sobrepostos e consecutivos. A janela começa e termina em pontos fixos no tempo:
O tamanho da janela define o intervalo de tempo sobre o qual o estágio acumula as mensagens. Você define o tamanho da janela usando o padrão comum Duração.
Funções
O estágio de agregação dá suporte às seguintes funções para calcular valores agregados sobre a propriedade de mensagem definida no caminho de entrada:
Função | Descrição |
---|---|
Somar | Calcula a soma dos valores da propriedade nas mensagens de entrada. |
Média | Calcula a média dos valores da propriedade nas mensagens de entrada. |
Contagem | Conta o número de vezes que a propriedade aparece na janela. |
Min | Calcula o valor mínimo dos valores da propriedade nas mensagens de entrada. |
Max | Calcula o valor máximo dos valores da propriedade nas mensagens de entrada. |
Último | Retorna o valor mais recente dos valores da propriedade nas mensagens de entrada. |
First | Retorna o primeiro valor dos valores da propriedade nas mensagens de entrada. |
Coletar | Retorna todos os valores da propriedade nas mensagens de entrada. |
A tabela a seguir lista os tipos de dados de mensagem compatíveis com cada função:
Função | Inteiro | Flutuante | String | Datetime | Array | Objeto | Binário |
---|---|---|---|---|---|---|---|
Soma | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Média | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Contagem | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Min | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Max | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Último | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
First | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Coletar | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Exemplo de configuração
O exemplo JSON a seguir mostra uma configuração de estágio de agregação completa:
{
"displayName":"downSample",
"description":"Calculate average for production tags",
"window":
{
"type":"tumbling",
"size":"10s"
},
"properties":
[
{
"function":"average",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_avg"
},
{
"function":"collect",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_all"
},
{
"function":"average",
"inputPath":".payload.pressure",
"outputPath":".payload.pressure"
},
{
"function":"last",
"inputPath":".systemProperties",
"outputPath": ".systemProperties"
}
]
}
A configuração define um estágio de agregação que calcula, em uma janela de dez segundos:
- Temperatura média
- Soma da temperatura
- Soma da pressão
Exemplo
Este exemplo inclui duas mensagens de entrada de exemplo e uma mensagem de saída de exemplo gerada usando a configuração anterior:
Mensagem de entrada 1:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 10,
"temperature":250,
"pressure":30,
"runningState": true
}
}
Mensagem de entrada 2:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 11,
"temperature":235,
"pressure":25,
"runningState": true
}
}
Mensagem de saída:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"payload":{
"temperature_avg":242.5,
"temperature_all":[250,235],
"pressure":27.5
}
}