Atividade de Fluxo de Dados no Azure Data Factory e no Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Use a atividade Fluxo de Dados para transformar e mover dados por meio de fluxos de dados de mapeamento. Se você for novo com fluxos de dados, confira Visão geral do fluxo de dados de mapeamento

Criar uma atividade de Fluxo de Dados com a interface do usuário

Para usar uma atividade Fluxo de Dados em um pipeline, conclua as etapas a seguir:

  1. Procure Fluxo de Dados no painel Atividades do pipeline e arraste uma atividade Fluxo de Dados para a tela do pipeline.

  2. Selecione a nova atividade Fluxo de Dados na tela, se ainda não estiver selecionada, e a guia Configurações para editar os detalhes.

    Monitorar uma atividade de Fluxo de Dados.

  3. A chave do ponto de verificação é usada para definir o ponto de verificação quando o fluxo de dados é usado para captura de dados de alterações. Você pode substituí-la. As atividades de fluxo de dados usam um valor de guid como chave do ponto de verificação em vez de "nome do pipeline + nome da atividade", para que sempre possam continuar acompanhando o estado da captura de dados de alterações do cliente, mesmo que haja ações de renomeação. Todas as atividades de fluxo de dados existentes usam a chave de padrão antiga para fins de compatibilidade com versões anteriores. A opção de chave de ponto de verificação depois de publicar uma nova atividade de fluxo de dados com o recurso de fluxo de dados habilitado para captura de dados de alterações é mostrada abaixo.

    Mostra a interface do usuário para uma atividade de Fluxo de Dados com chave de ponto de verificação.

  4. Selecione um fluxo de dados existente ou crie um novo usando o botão Novo. Selecione outras opções conforme necessário para concluir sua configuração.

Sintaxe

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Propriedades de tipo

Propriedade Descrição Valores permitidos Obrigatório
fluxo de dados A referência ao Fluxo de Dados sendo executado DataFlowReference Sim
integrationRuntime O ambiente de computação em que o fluxo de dados é executado. Se não for especificado, o runtime de integração de resolução automática do Azure será usado. IntegrationRuntimeReference Não
compute.coreCount O número de núcleos usados no cluster do Spark. Só poderá ser especificado se o runtime de integração do Azure de resolução automática for utilizado 8, 16, 32, 48, 80, 144, 272 Não
compute.computeType O tipo de computação utilizada no cluster do Spark. Só poderá ser especificado se o runtime de integração do Azure de resolução automática for utilizado "Geral" Não
staging.linkedService Se você estiver usando um coletor ou uma origem do Azure Synapse Analytics, especifique a conta de armazenamento utilizada para o processo de preparo do PolyBase.

Se o Armazenamento do Azure estiver configurado com o ponto de extremidade de serviço de VNet, você precisará usar a autenticação de identidade gerenciada com a opção “permitir serviço confiável da Microsoft” habilitada na conta de armazenamento. Confira Impacto de usar pontos de extremidade de serviço de VNet com Armazenamento do Azure. Além disso, aprenda as configurações necessárias para o Blob do Azure e o Azure Data Lake Storage Gen2, respectivamente.
LinkedServiceReference Somente se o fluxo de dados lê ou grava em um Azure Synapse Analytics
staging.folderPath Se você estiver usando um coletor ou uma origem do Azure Synapse Analytics, o caminho da pasta na conta de armazenamento de blobs utilizada para o processo de preparo do PolyBase String Somente se o fluxo de dados lê ou grava no Azure Synapse Analytics
traceLevel Definir o nível de registros em log da sua execução de atividade de fluxo de dados Fino, grosso, nenhum Não

Executar fluxo de dados

Dimensionar dinamicamente a computação de fluxo de dados em runtime

As propriedades Core Count e Compute Type podem ser definidas dinamicamente para ajustar o tamanho de seus dados de origem de entrada em runtime. Use atividades de pipeline como Lookup ou Get Metadata para localizar o tamanho dos dados do conjunto de dados de origem. Em seguida, use Add Dynamic Content nas propriedades de atividade do fluxo de dados. Você pode escolher tamanhos de computação pequenos, médios ou grandes. Opcionalmente, escolha "Personalizado" e configure os tipos de computação e o número de núcleos manualmente.

Fluxo de dados dinâmico

Este é um breve tutorial em vídeo explicando essa técnica

Runtime de integração do fluxo de dados

Escolha qual runtime de integração usar para a execução da atividade do fluxo de dados. Por padrão, o serviço usa o runtime de integração de resolução automática do Azure com quatro núcleos de trabalho. Esse IR tem um tipo de computação de uso geral e é executado na mesma região que a sua instância de serviço. Para pipelines operacionalizados, é altamente recomendável que você crie seus próprios Azure Integration Runtimes que definem regiões específicas, tipo de computação, contagens principais e TTL para a execução da atividade de fluxo de dados.

Um tipo de computação mínimo de Uso Geral com uma configuração de 8 + 8 (16 núcleos virtuais no total), e uma TTL (vida útil) de dez minutos é a recomendação mínima para a maioria das cargas de trabalho de produção. Ao definir um pequeno TTL, o Azure IR pode manter um cluster quente que não incorrerá em vários minutos de tempo de início para um cluster frio. Para obter mais informações, confira Azure Integration Runtime.

Azure Integration Runtime

Importante

A seleção do Integration Runtime na atividade Fluxo de Dados se aplica somente às execuções disparadas do pipeline. A depuração do pipeline com fluxos de dados é executada no cluster especificado na sessão de depuração.

PolyBase

Se você estiver usando um Azure Synapse Analytics como um coletor ou origem, deverá escolher um local de preparo para o carregamento em lotes do PolyBase. O PolyBase permite o carregamento em lotes em massa em vez de carregar os dados linha por linha. O PolyBase reduz bastante o tempo de carregamento no Azure Synapse Analytics.

Chave do ponto de verificação

Ao usar a opção de captura de alterações para fontes de fluxo de dados, o ADF mantém e gerencia o ponto de verificação para você automaticamente. A chave de ponto de verificação padrão é um hash do nome do fluxo de dados e do nome do pipeline. Se você estiver usando um padrão dinâmico para suas tabelas ou pastas de origem, talvez queira substituir esse hash e definir seu valor de chave de ponto de verificação aqui.

Nível de log

Se não precisar de cada execução de pipeline das suas atividades de fluxo de dados para registrar completamente todos os logs de telemetria detalhados, você pode definir seu nível de registros em log como "Básico" ou "Nenhum". Ao executar os fluxos de dados no modo “Detalhado” (padrão), você solicita que o serviço registre totalmente a atividade em cada nível de partição individual durante a transformação dos dados. Isso pode ser uma operação dispendiosa. Portanto, habilitar o modo detalhado somente ao solucionar problemas pode melhorar o fluxo geral de dados e o desempenho do pipeline. O modo "Básico" só registra as durações de transformação e "Nenhum" fornece apenas um resumo das durações.

Nível de log

Propriedades do coletor

O recurso de agrupamento em fluxos de dados permite que você defina a ordem de execução de seus coletores, bem como dos coletores de grupo juntos usando o mesmo número de grupo. Para ajudar a gerenciar os grupos, você pode pedir que o serviço execute coletores no mesmo grupo em paralelo. Você também pode definir o grupo de coletores para continuar mesmo depois que um deles encontrar um erro.

O comportamento padrão dos coletores de fluxo de dados é executar cada coletor sequencialmente, em série, e fazer com que o fluxo de dados falhe quando um erro for encontrado no coletor. Além disso, todos os coletores são padronizados para o mesmo grupo, a menos que você acesse as propriedades de fluxo de dados e defina prioridades diferentes para os coletores.

Propriedades do coletor

Primeira linha apenas

Essa opção só está disponível para fluxos de dados que têm coletores de cache habilitados para "Saída para atividade". A saída do fluxo de dados que é injetada diretamente em seu pipeline é limitada a 2 MB. A configuração "primeira linha somente" ajuda a limitar a saída de dados do fluxo de dados ao injetar a saída da atividade de fluxo de dados diretamente em seu pipeline.

Parametrização de fluxos de dados

Conjuntos de dados parametrizados

Se o fluxo de dados usar conjuntos de dados parametrizados, defina os valores de parâmetro na guia Configurações.

Executar parâmetros de Fluxo de Dados

Fluxos de dados parametrizados

Se o fluxo de dados for parametrizado, defina os valores dinâmicos dos parâmetros de fluxo de dados na guia Parâmetros. Você pode usar tanto a linguagem de expressão de pipeline como a linguagem de expressão do fluxo de dados para atribuir valores de parâmetros dinâmicos ou literais. Para obter mais informações, confira Parâmetros de Fluxo de Dados.

Propriedades de computação parametrizadas.

Você pode parametrizar a contagem de núcleos ou o tipo de computação se usar o Azure Integration Runtime de resolução automática e especificar valores para compute.coreCount e compute.computeType.

Executar exemplo de parâmetro de fluxo de dados

Depuração de pipeline da atividade de Fluxo de Dados

Para uma execução de pipeline de depuração com uma atividade de Fluxo de Dados, você deve alternar o modo de depuração de fluxo de dados por meio do controle deslizante Depuração de Fluxo de Dados na barra superior. O modo de depuração permite executar o fluxo de dados em um cluster do Spark ativo. Para saber mais, consulte Modo de depuração.

Captura de tela que mostra onde é o botão Depuração

O pipeline de depuração é executado no cluster de depuração ativa, não no ambiente do runtime de integração especificado nas configurações de atividade do Fluxo de Dados. Você pode escolher o ambiente de computação de depuração ao iniciar o modo de depuração.

Monitoramento da atividade de Fluxo de Dados

A atividade de Fluxo de Dados tem uma experiência de monitoramento especial, na qual você pode exibir informações sobre particionamento, tempo de preparo e linhagem de dados. Abra o painel de monitoramento usando o ícone de óculos em Ações. Para saber mais, confira Monitoramento de fluxos de dados.

Usar resultados da atividade de Fluxo de Dados em uma atividade subsequente

A atividade de fluxo de dados gera métricas referentes ao número de linhas gravadas em cada coletor e linhas lidas de cada fonte. Esses resultados são retornados na seção output do resultado da execução de atividade. As métricas retornadas estão no formato do JSON abaixo.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Por exemplo, para obter o número de linhas gravadas em um coletor chamado 'sink1' em uma atividade denominada 'dataflowActivity', use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Para obter o número de linhas lidas de uma fonte denominada 'source1' que foi utilizada nesse coletor, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Observação

Se um coletor tiver zero linhas gravadas, ele não aparecerá nas métricas. A existência pode ser verificada usando a função contains. Por exemplo, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') verifica se alguma linha foi gravada em sink1.

Confira as atividades de fluxo de controle compatíveis: