Atividade de fluxo de dados no Azure Data Factory e no Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Use a atividade Fluxo de Dados para transformar e mover dados por meio de fluxos de dados de mapeamento. Se você é novo em fluxos de dados, consulte Visão geral do fluxo de dados de mapeamento

Criar uma atividade de fluxo de dados com a interface do usuário

Para usar uma atividade de Fluxo de Dados em um pipeline, conclua as seguintes etapas:

  1. Procure Fluxo de Dados no painel Atividades do pipeline e arraste uma atividade de Fluxo de Dados para a tela do pipeline.

  2. Selecione a nova atividade Fluxo de Dados na tela, se ainda não estiver selecionada, e a guia Configurações para editar seus detalhes.

    Mostra a interface do usuário de uma atividade de Fluxo de Dados.

  3. A chave de ponto de verificação é usada para definir o ponto de verificação quando o fluxo de dados é usado para captura de dados alterados. Você pode substituí-lo. As atividades de fluxo de dados usam um valor guid como chave de ponto de verificação em vez de "nome do pipeline + nome da atividade" para que ele possa sempre acompanhar o estado de captura de dados de alteração do cliente, mesmo que haja ações de renomeação. Toda a atividade de fluxo de dados existente usa a chave de padrão antiga para compatibilidade com versões anteriores. A opção de chave de ponto de verificação após a publicação de uma nova atividade de fluxo de dados com recurso de fluxo de dados habilitado para captura de dados de alteração é mostrada abaixo.

    Mostra a interface do usuário de uma atividade de fluxo de dados com chave de ponto de verificação.

  4. Selecione um fluxo de dados existente ou crie um novo usando o botão Novo. Selecione outras opções conforme necessário para concluir a configuração.

Sintaxe

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Propriedades do tipo

Property Description Valores permitidos Necessário
fluxo de dados A referência ao fluxo de dados que está sendo executado DataFlowReference Sim
integrationRuntime O ambiente de computação em que o fluxo de dados é executado. Se não for especificado, o tempo de execução de integração do Azure de autoresolução será usado. IntegrationRuntimeReference Não
compute.coreCount O número de núcleos usados no cluster de faísca. Só pode ser especificado se o tempo de execução da Integração do Azure de resolução automática for usado 8, 16, 32, 48, 80, 144, 272 Não
compute.computeType O tipo de computação usado no cluster de faísca. Só pode ser especificado se o tempo de execução da Integração do Azure de resolução automática for usado "Geral" Não
staging.linkedService Se você estiver usando uma fonte ou coletor do Azure Synapse Analytics, especifique a conta de armazenamento usada para o preparo do PolyBase.

Se o seu Armazenamento do Azure estiver configurado com o ponto de extremidade do serviço VNet, você deverá usar a autenticação de identidade gerenciada com "permitir serviço confiável da Microsoft" habilitado na conta de armazenamento, consulte Impacto do uso de Pontos de Extremidade de Serviço VNet com o armazenamento do Azure. Conheça também as configurações necessárias para o Blob do Azure e o Azure Data Lake Storage Gen2 , respectivamente.
LinkedServiceReference Somente se o fluxo de dados ler ou gravar em um Azure Synapse Analytics
staging.folderPath Se você estiver usando uma fonte ou coletor do Azure Synapse Analytics, o caminho da pasta na conta de armazenamento de blob usado para o preparo do PolyBase String Somente se o fluxo de dados ler ou gravar no Azure Synapse Analytics
traceLevel Definir o nível de registro da execução da atividade de fluxo de dados Bom, grosseiro, nenhum Não

Executar fluxo de dados

Dimensione dinamicamente a computação do fluxo de dados em tempo de execução

As propriedades Core Count e Compute Type podem ser definidas dinamicamente para se ajustarem ao tamanho dos dados de origem recebidos em tempo de execução. Use atividades de pipeline como Pesquisa ou Obter Metadados para encontrar o tamanho dos dados do conjunto de dados de origem. Em seguida, use Adicionar Conteúdo Dinâmico nas propriedades da atividade Fluxo de Dados. Você pode escolher tamanhos de computação pequenos, médios ou grandes. Opcionalmente, escolha "Personalizado" e configure os tipos de computação e o número de núcleos manualmente.

Fluxo dinâmico de dados

Aqui está um breve tutorial em vídeo explicando essa técnica:

Tempo de execução da integração do fluxo de dados

Escolha qual Tempo de Execução de Integração usar para a execução da atividade de Fluxo de Dados. Por predefinição, o serviço utiliza o Azure Integration Runtime de resolução automática com quatro núcleos de trabalho. Este IR tem um tipo de computação de uso geral e é executado na mesma região que sua instância de serviço. Para pipelines operacionalizados, é altamente recomendável que você crie seus próprios Tempos de Execução de Integração do Azure que definem regiões específicas, tipo de computação, contagens de núcleo e TTL para a execução da atividade de fluxo de dados.

Um tipo mínimo de computação de Propósito Geral com uma configuração de 8+8 (16 v-cores totais) e um tempo de vida (TTL) de 10 minutos é a recomendação mínima para a maioria das cargas de trabalho de produção. Ao definir um pequeno TTL, o IR do Azure pode manter um cluster quente que não incorrerá nos vários minutos de hora de início de um cluster frio. Para obter mais informações, consulte Tempo de execução da integração do Azure.

Azure Integration Runtime

Importante

A seleção do Tempo de Execução de Integração na atividade Fluxo de Dados só se aplica a execuções acionadas do seu pipeline. A depuração do pipeline com fluxos de dados é executada no cluster especificado na sessão de depuração.

PolyBase

Se você estiver usando um Azure Synapse Analytics como um coletor ou fonte, deverá escolher um local de preparo para sua carga em lote do PolyBase. O PolyBase permite o carregamento em lote em massa em vez de carregar os dados linha por linha. O PolyBase reduz drasticamente o tempo de carregamento no Azure Synapse Analytics.

Chave de ponto de verificação

Ao usar a opção de captura de alterações para fontes de fluxo de dados, o ADF mantém e gerencia o ponto de verificação para você automaticamente. A chave de ponto de verificação padrão é um hash do nome do fluxo de dados e do nome do pipeline. Se você estiver usando um padrão dinâmico para suas tabelas ou pastas de origem, talvez queira substituir esse hash e definir seu próprio valor de chave de ponto de verificação aqui.

Nível de registo

Se você não precisar que todas as execuções de pipeline de suas atividades de fluxo de dados registrem totalmente todos os logs de telemetria detalhados, você pode, opcionalmente, definir seu nível de log como "Básico" ou "Nenhum". Ao executar seus fluxos de dados no modo "Detalhado" (padrão), você está solicitando que o serviço registre totalmente a atividade em cada nível de partição individual durante a transformação de dados. Essa pode ser uma operação cara, portanto, apenas habilitar detalhadamente a solução de problemas pode melhorar o fluxo geral de dados e o desempenho do pipeline. O modo "Básico" registra apenas as durações de transformação, enquanto "Nenhum" fornece apenas um resumo das durações.

Nível de registo

Propriedades do lavatório

O recurso de agrupamento em fluxos de dados permite que você defina a ordem de execução de seus coletores, bem como agrupe coletores usando o mesmo número de grupo. Para ajudar a gerenciar grupos, você pode pedir ao serviço para executar coletores, no mesmo grupo, em paralelo. Você também pode definir o grupo de coletores para continuar mesmo depois que um dos coletores encontrar um erro.

O comportamento padrão dos coletores de fluxo de dados é executar cada coletor sequencialmente, de maneira serial, e falhar o fluxo de dados quando um erro for encontrado no coletor. Além disso, todos os coletores são padronizados para o mesmo grupo, a menos que você entre nas propriedades de fluxo de dados e defina prioridades diferentes para os coletores.

Propriedades do lavatório

Apenas na primeira fila

Esta opção só está disponível para fluxos de dados que tenham coletores de cache habilitados para "Saída para atividade". A saída do fluxo de dados que é injetado diretamente em seu pipeline é limitada a 2MB. Definir "somente primeira linha" ajuda a limitar a saída de dados do fluxo de dados ao injetar a saída da atividade de fluxo de dados diretamente no pipeline.

Parametrização de fluxos de dados

Conjuntos de dados parametrizados

Se o fluxo de dados usar conjuntos de dados parametrizados, defina os valores dos parâmetros na guia Configurações .

Executar parâmetros de fluxo de dados

Fluxos de dados parametrizados

Se o fluxo de dados for parametrizado, defina os valores dinâmicos dos parâmetros de fluxo de dados na guia Parâmetros . Você pode usar a linguagem de expressão de pipeline ou a linguagem de expressão de fluxo de dados para atribuir valores de parâmetros dinâmicos ou literais. Para obter mais informações, consulte Parâmetros de fluxo de dados.

Propriedades de computação parametrizadas.

Você pode parametrizar a contagem de núcleos ou o tipo de computação se usar o tempo de execução da Integração do Azure de autoresolução e especificar valores para compute.coreCount e compute.computeType.

Exemplo de parâmetro de fluxo de dados Execute

Depuração de pipeline da atividade de fluxo de dados

Para executar um pipeline de depuração executado com uma atividade de Fluxo de Dados, você deve ativar o modo de depuração de fluxo de dados por meio do controle deslizante Depurar Fluxo de Dados na barra superior. O modo de depuração permite executar o fluxo de dados em um cluster ativo do Spark. Para obter mais informações, consulte Modo de depuração.

Captura de tela que mostra onde está o botão Depurar

O pipeline de depuração é executado no cluster de depuração ativo, não no ambiente de tempo de execução de integração especificado nas configurações de atividade do Fluxo de Dados. Você pode escolher o ambiente de computação de depuração ao iniciar o modo de depuração.

Monitorando a atividade de fluxo de dados

A atividade Fluxo de Dados tem uma experiência especial de monitoramento, onde você pode visualizar informações de particionamento, tempo de estágio e linhagem de dados. Abra o painel de monitorização através do ícone de óculos em Ações. Para obter mais informações, consulte Monitorando fluxos de dados.

Usar resultados da atividade Fluxo de Dados em uma atividade subsequente

A atividade de fluxo de dados produz métricas relativas ao número de linhas gravadas em cada coletor e linhas lidas de cada fonte. Esses resultados são retornados na output seção do resultado da execução da atividade. As métricas retornadas estão no formato do json abaixo.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Por exemplo, para chegar ao número de linhas gravadas em um coletor chamado 'sink1' em uma atividade chamada 'dataflowActivity', use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Para obter o número de linhas lidas de uma fonte chamada 'source1' que foi usada nesse coletor, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Nota

Se um coletor tiver zero linhas escritas, ele não aparecerá nas métricas. A existência pode ser verificada usando a contains função. Por exemplo, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') verifica se alguma linha foi gravada no sink1.

Veja as atividades de fluxo de controle suportadas: