Transformação de coletor em fluxo de dados de mapeamento

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Os fluxos de dados estão disponíveis nos pipelines do Azure Data Factory e do Azure Synapse. Este artigo se aplica ao fluxo de dados de mapeamento. Se você for iniciante nas transformações, veja o artigo introdutório Transformar dados usando um fluxo de dados de mapeamento.

Depois de terminar de transformar seus dados, grave-os em um repositório de destino usando a transformação de coletor. Cada fluxo de dados requer pelo menos uma transformação de coletor, mas é possível gravar tantos coletores quantos forem necessários para concluir o fluxo de transformação. Para gravar em coletores adicionais, crie novos fluxos por meio de novas ramificações e divisões condicionais.

Cada transformação de coletor é associada a exatamente um conjunto de dados ou serviço vinculado. A transformação de coletor determina a forma e a localização na qual você deseja gravar os dados.

Conjuntos de valores embutidos

Ao criar uma transformação de coletor, escolha se as informações do coletor estão definidas dentro de um objeto de conjunto de dados ou na transformação de coletor. A maioria dos formatos está disponível somente em um ou outro. Para saber como usar um conector específico, consulte o documento do conector apropriado.

Quando um formato de objeto, tanto embutido quanto de um conjunto de dados têm suporte, há benefícios para ambos. Os objetos conjunto de dados são entidades reutilizáveis que podem ser usadas em outros fluxos de dados e atividades como Copy. Essas entidades reutilizáveis são especialmente úteis ao usar um esquema protegido. Conjuntos de dados não são baseados no Spark. Ocasionalmente, talvez seja necessário substituir certas configurações ou projeção do esquema na transformação de coletor.

Os conjuntos de linhas embutidos são recomendados quando são usados esquemas flexíveis, instâncias de coletor único ou coletores com parâmetros. Se seu coletor for muito parametrizado, os conjuntos de fontes embutidos permitem não criar um objeto "fictício". Os conjuntos de dados embutidos são baseados no Spark e suas propriedades são nativas ao fluxo de dados.

Para usar um conjunto de linhas embutido, selecione o formato desejado no seletor de Tipo de coletor. Em vez de selecionar um conjunto de um conjunto de dados do coletor, selecione o serviço vinculado ao qual deseja se conectar.

Screenshot that shows Inline selected.

BD do Workspace (somente workspaces do Synapse)

Ao usar fluxos de dados em workspaces do Azure Synapse, você terá uma opção adicional de coletor dos dados diretamente para um tipo de banco de dados que está dentro do workspace do Synapse. Isso atenuará a necessidade de adicionar conjuntos de dados ou serviços vinculados para esses bancos de dados. Os bancos de dados criados por meio dos modelos de banco de dados do Azure Synapse também podem ser acessados quando você seleciona o BD do Workspace.

Observação

O conector do BD do Workspace do Azure Synapse está atualmente em versão prévia pública e só pode funcionar com bancos de dados do Spark Lake no momento

Screenshot that shows workspace db selected.

Tipos de coletor com suporte

O fluxo de dados de mapeamento segue uma abordagem de ELT (extração, carregamento, transformação) e funciona com conjuntos de dados de preparo que estão todos no Azure. Atualmente, os seguintes conjuntos de dados podem ser usados em uma transformação de coletor.

Connector Formatar Conjunto de dados/Embutido
Armazenamento de Blobs do Azure Avro
Texto delimitado
Delta
JSON
ORC
Parquet
✓/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Azure Cosmos DB para NoSQL ✓/-
Azure Data Lake Storage Gen1 Avro
Texto delimitado
JSON
ORC
Parquet
✓/-
✓/-
✓/-
✓/✓
✓/-
Azure Data Lake Storage Gen2 Avro
Common Data Service
Texto delimitado
Delta
JSON
ORC
Parquet
✓/✓
-/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Banco de Dados do Azure para MySQL ✓/✓
Banco de Dados do Azure para PostgreSQL ✓/✓
Azure Data Explorer ✓/✓
Banco de Dados SQL do Azure ✓/✓
Instância Gerenciada do SQL do Azure ✓/-
Azure Synapse Analytics ✓/-
Dataverse ✓/✓
Dynamics 365 ✓/✓
Dynamics CRM ✓/✓
Fabric Lakehouse ✓/✓
SFTP Avro
Texto delimitado
JSON
ORC
Parquet
✓/✓
✓/✓
✓/✓
✓/✓
✓/✓
Snowflake ✓/✓
SQL Server ✓/✓

As configurações específicas para esses conectores estão localizadas na guia Configurações. Exemplos de script de fluxo de dados e informações dessas configurações estão localizados na documentação do conector.

O serviço tem acesso a mais de 90 conectores nativos. Para gravar nessas origens do fluxo de dados, use a atividade Copy para carregar os dados de um coletor com suporte.

Configurações do coletor

Depois de adicionar um coletor, configure-o por meio da guia Coletor. Aqui, é possível escolher ou criar o conjunto de dados para o qual seu coletor irá gravar. Os valores de desenvolvimento para parâmetros de conjunto de dados podem ser definidos em Configurações de depuração. (O modo de depuração deve estar ativado.)

O vídeo a seguir explica várias opções de coletor diferentes para tipos de arquivo delimitados por texto.

Screenshot that shows Sink settings.

Descompasso de esquema: descompasso de esquema é a capacidade do serviço de lidar, de forma nativa, com esquemas flexíveis nos fluxos de dados, sem a necessidade de definir explicitamente as alterações de coluna. Habilite Permitir descompasso de esquema para gravar colunas adicionais sobre o que está definido no esquema de dados do coletor.

Validar esquema: se validar esquema for selecionado, o fluxo de dados falhará se qualquer coluna na projeção do coletor não for encontrada no armazenamento do coletor ou se os tipos de dados não corresponderem. Use essa configuração para impor que o esquema do coletor atenda ao contrato de sua projeção definida. Ela é útil em cenários de coletor de banco de dados para sinalizar que os nomes ou tipos de coluna foram alterados.

Coletor de cache

Um coletor de cache é quando um fluxo de dados grava dados no cache do Spark em vez de em um repositório de dados. No fluxos de dados de mapeamento, é possível fazer referência a esses dados dentro do mesmo fluxo, muitas vezes usando uma pesquisa de cache. Isso é útil quando desejar fazer referência a dados como parte de uma expressão, mas não desejar unir explicitamente as colunas a ela. Exemplos comuns são quando um coletor de cache ajuda a pesquisar um valor máximo em um armazenamento de dados e os códigos de erro correspondentes a um banco de dado de mensagens de erro.

Para gravar em um coletor de cache, adicione uma transformação de coletor e selecione Cache como o tipo de coletor. Ao contrário de outros tipos de coletor, não é necessário selecionar um conjunto de dados ou um serviço vinculado porque não haverá gravação em um repositório externo.

Select cache sink

Nas configurações do coletor, é possível, de forma opcional, especificar as colunas de chave do coletor de cache. Elas são usadas como condições de correspondência ao usar a lookup() função em uma pesquisa de cache. Se especificar colunas de chave, não poderá usar a função outputs() em uma pesquisa de cache. Para saber mais sobre a sintaxe da pesquisa de cache, consulte pesquisas de cache.

Cache sink key columns

Por exemplo, se especificar uma única coluna de chave de column1 em um coletor de cache chamado cacheExample, a chamada cacheExample#lookup() teria um parâmetro que especifica a qual linha no coletor de cache deve corresponder. A função gera uma única coluna complexa com subcolunas para cada coluna mapeada.

Observação

Um coletor de cache deve estar em um fluxo de dados completamente independente de qualquer transformação que faça referência a ele por meio de uma pesquisa de cache. Um coletor de cache também deve ser o primeiro coletor gravado.

Gravar na saída da atividade O coletor armazenado em cache pode, opcionalmente, gravar seus dados de saída na entrada da próxima atividade de pipeline. Isso permitirá que você passe dados de forma rápida e fácil de sua atividade de fluxo de dados sem a necessidade de persistir os dados em um armazenamento de dados.

Método Update

Para tipos de coletor de banco de dados, a guia Configurações incluirá uma propriedade "Método de atualização". O padrão é inserir, mas também inclui opções de caixa de seleção para atualização, upsert e exclusão. Para utilizar essas opções adicionais, adicione uma transformação Alter Row antes do coletor. A Alter Row permitirá que você defina as condições para cada uma das ações do banco de dados. Se sua origem for uma fonte de habilitação CDC nativa, você poderá definir os métodos de atualização sem uma Alter Row, pois o ADF já estará ciente dos marcadores de linha para inserir, atualizar, executar upsert e excluir.

Mapeamento de campo

Semelhante a uma transformação selecionar, na guia Mapeamento do coletor, é possível decidir quais colunas de entrada serão gravadas. Por padrão, todas as colunas de entrada, incluindo colunas descompassos, são mapeadas. O comportamento é conhecido como mapeamento automático.

Ao desativar o mapeamento automático, é possível adicionar mapeamentos fixos baseados em colunas ou mapeamentos baseados em regras. Com mapeamentos baseados em regras, é possível escrever expressões com correspondência de padrões. O mapeamento fixo mapeia nomes de coluna lógicos e físicos. Para obter mais informações sobre mapeamento baseado em regras, consulte Padrões de coluna no fluxo de dados de mapeamento.

Ordenação de coletor personalizado

Por padrão, os dados são gravados em vários coletores em uma ordem não determinística. O mecanismo de execução grava dados em paralelo, pois a lógica de transformação é concluída e a ordenação do coletor pode variar para cada execução. Para especificar uma ordenação exata do coletor, habilite a Ordenação personalizada do coletor na guia Geral do fluxo de dados. Quando habilitada, os coletores são gravados sequencialmente em ordem crescente.

Screenshot that shows Custom sink ordering.

Observação

Ao utilizar pesquisas armazenadas em cache, certifique-se de que a ordenação de coletor tenha os coletores em cache definidos como 1, o mais baixo (ou primeiro) na ordenação.

Custom sink ordering

Grupos de coletor

É possível agrupar coletores aplicando o mesmo número de ordem para uma série de coletores. O serviço tratará esses coletores como grupos que podem ser executados em paralelo. As opções para execução paralela surgirão na atividade fluxo de dados do pipeline.

Errors

Na guia Erros do coletor, você pode configurar o tratamento de linha de erro para capturar e redirecionar a saída para erros de driver de banco de dados e declarações com falha.

Ao gravar em bancos de dados, determinadas linhas de dados podem falhar devido a restrições definidas pelo destino. Por padrão, uma execução de fluxo de dados falhará no primeiro erro obtido. Em determinados conectores, é possível optar por continuar se houver erro que permite que o fluxo de dados seja concluído mesmo se as linhas individuais tiverem erros. Atualmente, essa funcionalidade está disponível somente no Banco de Dados SQL do Azure e no Azure Synapse. Para obter mais informações, consulte erro de manipulação de linha no banco de dados SQL do Azure.

Veja abaixo um tutorial em vídeo sobre como usar a manipulação de linha de erro de banco de dados automaticamente na transformação do coletor.

Para linhas de falha de declaração, você pode usar a transformação Assert upstream no fluxo de dados e redirecionar declarações com falha para um arquivo de saída aqui na guia erros do coletor. Aqui, você também tem a opção de ignorar linhas com falhas de declaração e não gerar essas linhas no armazenamento de dados de destino do coletor.

Assert failure rows

Visualização de dados no coletor

Ao buscar uma visualização de dados em um modo de depuração, nenhum dado será gravado no coletor. Um instantâneo da aparência dos dados será retornado, mas nada será gravado no destino. Para testar a gravação de dados em seu coletor, execute uma depuração de pipeline na tela do pipeline.

Script de fluxo de dados

Exemplo

Abaixo está um exemplo de uma transformação de coletor e o respectivo script de fluxo de dados:

sink(input(
		movie as integer,
		title as string,
		genres as string,
		year as integer,
		Rating as integer
	),
	allowSchemaDrift: true,
	validateSchema: false,
	deletable:false,
	insertable:false,
	updateable:true,
	upsertable:false,
	keys:['movie'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	saveOrder: 1,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

Agora que seu fluxo de dados está criado, adicione uma atividade de execução de fluxo de dados ao seu pipeline.