Transformação do coletor no mapeamento do fluxo de dados

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Os fluxos de dados estão disponíveis no Azure Data Factory e no Azure Synapse Pipelines. Este artigo aplica-se ao mapeamento de fluxos de dados. Se você é novo em transformações, consulte o artigo introdutório Transformar dados usando um fluxo de dados de mapeamento.

Depois de concluir a transformação dos dados, escreva-os em um repositório de destino usando a transformação do coletor. Cada fluxo de dados requer pelo menos uma transformação de coletor, mas você pode gravar em quantos coletores forem necessários para concluir seu fluxo de transformação. Para gravar em coletores adicionais, crie novos fluxos por meio de novas ramificações e divisões condicionais.

Cada transformação de coletor está associada a exatamente um objeto de conjunto de dados ou serviço vinculado. A transformação do coletor determina a forma e o local dos dados nos quais você deseja gravar.

Conjuntos de dados embutidos

Ao criar uma transformação de coletor, escolha se as informações do coletor são definidas dentro de um objeto de conjunto de dados ou dentro da transformação do coletor. A maioria dos formatos está disponível em apenas um ou outro. Para saber como usar um conector específico, consulte o documento do conector apropriado.

Quando há suporte para um formato embutido e em um objeto de conjunto de dados, há benefícios para ambos. Os objetos de conjunto de dados são entidades reutilizáveis que podem ser usadas em outros fluxos de dados e atividades, como Copiar. Essas entidades reutilizáveis são especialmente úteis quando você usa um esquema protegido. Os conjuntos de dados não são baseados no Spark. Ocasionalmente, talvez seja necessário substituir determinadas configurações ou projeção de esquema na transformação do coletor.

Os conjuntos de dados embutidos são recomendados quando você usa esquemas flexíveis, instâncias de coletor únicas ou coletores parametrizados. Se o coletor for fortemente parametrizado, os conjuntos de dados embutidos permitirão que você não crie um objeto "fictício". Os conjuntos de dados embutidos são baseados no Spark e suas propriedades são nativas do fluxo de dados.

Para usar um conjunto de dados embutido, selecione o formato desejado no seletor Tipo de coletor . Em vez de selecionar um conjunto de dados de coletor, selecione o serviço vinculado ao qual deseja se conectar.

Screenshot that shows Inline selected.

Banco de dados do espaço de trabalho (somente espaços de trabalho Synapse)

Ao usar fluxos de dados em espaços de trabalho do Azure Synapse, você terá uma opção adicional para agrupar seus dados diretamente em um tipo de banco de dados que esteja dentro do seu espaço de trabalho Synapse. Tal reduzirá a necessidade de adicionar serviços ou conjuntos de dados ligados a essas bases de dados. Os bancos de dados criados por meio dos modelos de banco de dados do Azure Synapse também são acessíveis quando você seleciona Banco de Dados de Espaço de Trabalho.

Nota

O conector de banco de dados do Azure Synapse Workspace está atualmente em visualização pública e só pode funcionar com bancos de dados do Spark Lake no momento

Screenshot that shows workspace db selected.

Tipos de lavatório suportados

O fluxo de dados de mapeamento segue uma abordagem ELT (extração, carga e transformação) e funciona com conjuntos de dados de preparo que estão todos no Azure. Atualmente, os seguintes conjuntos de dados podem ser usados em uma transformação de coletor.

Conector Format Conjunto de dados/embutido
Armazenamento de Blobs do Azure Avro
Texto delimitado
Delta
JSON
ORC
Parquet
✓/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Azure Cosmos DB para NoSQL ✓/-
Armazenamento do Azure Data Lake Ger1 Avro
Texto delimitado
JSON
ORC
Parquet
✓/-
✓/-
✓/-
✓/✓
✓/-
Azure Data Lake Storage Gen2 (Armazenamento do Azure Data Lake Gen2) Avro
Common Data Model
Texto delimitado
Delta
JSON
ORC
Parquet
✓/✓
-/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Base de Dados do Azure para MySQL ✓/✓
Base de Dados do Azure para PostgreSQL ✓/✓
Azure Data Explorer ✓/✓
Base de Dados SQL do Azure ✓/✓
Instância Gerida do SQL no Azure ✓/-
Azure Synapse Analytics ✓/-
Dataverse ✓/✓
Dynamics 365 ✓/✓
Dynamics CRM ✓/✓
Tecido Lakehouse ✓/✓
SFTP Avro
Texto delimitado
JSON
ORC
Parquet
✓/✓
✓/✓
✓/✓
✓/✓
✓/✓
Snowflake ✓/✓
SQL Server ✓/✓

As configurações específicas desses conectores estão localizadas na guia Configurações. Exemplos de script de fluxo de dados e informações sobre essas configurações estão localizados na documentação do conector.

O serviço tem acesso a mais de 90 conectores nativos. Para gravar dados nessas outras fontes a partir do seu fluxo de dados, use a Atividade de cópia para carregar esses dados de um coletor suportado.

Configurações do coletor

Depois de adicionar um coletor, configure por meio da guia Coletor . Aqui você pode escolher ou criar o conjunto de dados no qual seu coletor grava. Os valores de desenvolvimento para parâmetros de conjunto de dados podem ser configurados em Configurações de depuração. (O modo de depuração deve estar ativado.)

O vídeo a seguir explica várias opções diferentes de coletor para tipos de arquivo delimitados por texto.

Screenshot that shows Sink settings.

Desvio de esquema: desvio de esquema é a capacidade do serviço de lidar nativamente com esquemas flexíveis em seus fluxos de dados sem a necessidade de definir explicitamente as alterações de coluna. Habilite Permitir desvio de esquema para escrever colunas adicionais sobre o que está definido no esquema de dados do coletor.

Validar esquema: Se validar esquema estiver selecionado, o fluxo de dados falhará se alguma coluna na projeção do coletor não for encontrada no repositório do coletor ou se os tipos de dados não corresponderem. Use essa configuração para impor que o esquema do coletor atenda ao contrato de sua projeção definida. É útil em cenários de coletor de banco de dados para sinalizar que os nomes ou tipos de coluna foram alterados.

Coletor de cache

Um coletor de cache é quando um fluxo de dados grava dados no cache do Spark em vez de um armazenamento de dados. No mapeamento de fluxos de dados, você pode fazer referência a esses dados dentro do mesmo fluxo muitas vezes usando uma pesquisa de cache. Isso é útil quando você deseja fazer referência a dados como parte de uma expressão, mas não deseja unir explicitamente as colunas a ela. Exemplos comuns em que um coletor de cache pode ajudar são procurar um valor máximo em um armazenamento de dados e fazer a correspondência de códigos de erro com um banco de dados de mensagens de erro.

Para gravar em um coletor de cache, adicione uma transformação de coletor e selecione Cache como o tipo de coletor. Ao contrário de outros tipos de coletor, você não precisa selecionar um conjunto de dados ou serviço vinculado porque não está gravando em um armazenamento externo.

Select cache sink

Nas configurações do coletor, você pode, opcionalmente, especificar as colunas de chave do coletor de cache. Eles são usados como condições de correspondência ao usar a lookup() função em uma pesquisa de cache. Se você especificar colunas de chave, não poderá usar a outputs() função em uma pesquisa de cache. Para saber mais sobre a sintaxe de pesquisa de cache, consulte Pesquisas em cache.

Cache sink key columns

Por exemplo, se eu especificar uma única coluna de chave de em um coletor de cache chamado cacheExample, a chamada cacheExample#lookup() terá um parâmetro que especifica qual linha no coletor de column1 cache deve corresponder. A função gera uma única coluna complexa com subcolunas para cada coluna mapeada.

Nota

Um coletor de cache deve estar em um fluxo de dados completamente independente de qualquer transformação que faça referência a ele por meio de uma pesquisa de cache. Um coletor de cache também deve ser o primeiro coletor gravado.

Gravar na saída da atividade O coletor armazenado em cache pode, opcionalmente, gravar seus dados de saída na entrada da próxima atividade do pipeline. Isso permitirá que você passe dados de forma rápida e fácil para fora de sua atividade de fluxo de dados sem a necessidade de persistir os dados em um armazenamento de dados.

Método de atualização

Para tipos de coletor de banco de dados, a guia Configurações incluirá uma propriedade "Método de atualização". O padrão é inserir, mas também inclui opções de caixa de seleção para atualizar, atualizar e excluir. Para utilizar essas opções adicionais, você precisará adicionar uma transformação Alter Row antes do coletor. A linha Alter permitirá definir as condições para cada uma das ações do banco de dados. Se sua fonte for uma fonte nativa habilitada pelo CDC, você poderá definir os métodos de atualização sem uma linha de alteração, pois o ADF já está ciente dos marcadores de linha para inserir, atualizar, atualizar e excluir.

Mapeamento de campos

Semelhante a uma transformação select, na guia Mapeamento do coletor, você pode decidir quais colunas de entrada serão gravadas. Por padrão, todas as colunas de entrada, incluindo colunas derivadas, são mapeadas. Esse comportamento é conhecido como mapeamento automático.

Ao desativar o mapeamento automático, você pode adicionar mapeamentos fixos baseados em colunas ou mapeamentos baseados em regras. Com mapeamentos baseados em regras, você pode escrever expressões com correspondência de padrões. O mapeamento fixo mapeia nomes de colunas lógicas e físicas. Para obter mais informações sobre mapeamento baseado em regras, consulte Padrões de coluna no mapeamento de fluxo de dados.

Ordenação de sinks personalizada

Por padrão, os dados são gravados em vários coletores em uma ordem não determinística. O mecanismo de execução grava dados em paralelo à medida que a lógica de transformação é concluída, e a ordem do coletor pode variar a cada execução. Para especificar uma ordem de coletor exata, habilite Ordenação de coletor personalizada na guia Geral do fluxo de dados. Quando ativados, os coletores são gravados sequencialmente em ordem crescente.

Screenshot that shows Custom sink ordering.

Nota

Ao utilizar pesquisas em cache, certifique-se de que o pedido do coletor tenha os coletores em cache definidos como 1, o mais baixo (ou o primeiro) em ordem.

Custom sink ordering

Grupos de lavatórios

Você pode agrupar coletores aplicando o mesmo número de ordem para uma série de coletores. O serviço tratará esses coletores como grupos que podem ser executados em paralelo. As opções para execução paralela aparecerão na atividade de fluxo de dados do pipeline.

Erros

Na guia erros de coletor, você pode configurar a manipulação de linha de erro para capturar e redirecionar a saída para erros de driver de banco de dados e asserções com falha.

Ao gravar em bancos de dados, determinadas linhas de dados podem falhar devido a restrições definidas pelo destino. Por padrão, uma execução de fluxo de dados falhará no primeiro erro que receber. Em determinados conectores, você pode optar por Continuar no erro que permite que o fluxo de dados seja concluído, mesmo que linhas individuais tenham erros. Atualmente, esse recurso só está disponível no Banco de Dados SQL do Azure e no Azure Synapse. Para obter mais informações, consulte Tratamento de linha de erro no SQL DB do Azure.

Abaixo está um tutorial em vídeo sobre como usar o tratamento de linha de erro de banco de dados automaticamente em sua transformação de coletor.

Para linhas de falha de declaração, você pode usar a transformação Assert upstream em seu fluxo de dados e, em seguida, redirecionar asserções com falha para um arquivo de saída aqui na guia erros de coletor. Você também tem uma opção aqui para ignorar linhas com falhas de asserção e não enviar essas linhas para o armazenamento de dados de destino do coletor.

Assert failure rows

Pré-visualização de dados no coletor

Ao buscar uma visualização de dados no modo de depuração, nenhum dado será gravado no coletor. Um instantâneo da aparência dos dados será retornado, mas nada será gravado no seu destino. Para testar a gravação de dados em seu coletor, execute uma depuração de pipeline a partir da tela do pipeline.

Script de fluxo de dados

Exemplo

Abaixo está um exemplo de uma transformação de coletor e seu script de fluxo de dados:

sink(input(
		movie as integer,
		title as string,
		genres as string,
		year as integer,
		Rating as integer
	),
	allowSchemaDrift: true,
	validateSchema: false,
	deletable:false,
	insertable:false,
	updateable:true,
	upsertable:false,
	keys:['movie'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	saveOrder: 1,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

Agora que você criou seu fluxo de dados, adicione uma atividade de fluxo de dados ao seu pipeline.