Formato Delta no Azure Data Factory

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Dica

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Este artigo destaca como copiar dados de e para um data lake armazenado no Azure Data Lake Store Gen2 ou no Armazenamento de Blobs do Azure usando o formato delta. Esse conector está disponível como um conjunto de dados embutido no fluxo de dados de mapeamento como uma fonte e um coletor.

Propriedades do fluxo de dados de mapeamento

Esse conector está disponível como um conjunto de dados embutido no fluxo de dados de mapeamento como uma fonte e um coletor.

Propriedades da Origem

A tabela abaixo lista as propriedades com suporte por uma fonte delta. Você pode editar essas propriedades na guia Opções de Origem.

Nome Descrição Obrigatório Valores permitidos Propriedade do script do Fluxo de Dados
Formatar O formato deve ser delta sim delta format
Sistema de arquivos O sistema de contêiner/arquivos do delta lake sim String fileSystem
Caminho da pasta O diretório do lago delta sim String folderPath
Tipo de compactação O tipo de compactação da tabela delta não bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Nível de Compactação Escolha se a compactação será concluída o mais rapidamente possível ou se o arquivo resultante deve ter sua compactação otimizada. obrigatório se compressedType for especificado. Optimal ou Fastest compressionLevel
Viagem no tempo Escolha se deseja consultar um instantâneo mais antigo de uma tabela delta não Consulta por carimbo de data/hora: Carimbo de data/hora
Consultar por versão: Inteiro
timestampAsOf
versionAsOf
Permitir nenhum arquivo encontrado Se for true, um erro não será gerado caso nenhum arquivo for encontrado não true ou false ignoreNoFilesFound

Importar esquema

O Delta só está disponível como um conjunto de dados embutido e, por padrão, não tem um esquema associado. Para obter metadados de coluna, clique no botão Importar esquema na guia Projeção. Isso permite que você referencie os nomes de colunas e os tipos de dados especificados pelo corpus. Para importar o esquema, uma sessão de depuração de fluxo de dados deve estar ativa e você deve ter um arquivo de definição de entidade CDM existente para o qual apontar.

Exemplo de script de origem delta

source(output(movieId as integer,
            title as string,
            releaseDate as date,
            rated as boolean,
            screenedOn as timestamp,
            ticketPrice as decimal(10,2)
            ),
    store: 'local',
    format: 'delta',
    versionAsOf: 0,
    allowSchemaDrift: false,
    folderPath: $tempPath + '/delta'
  ) ~> movies

Propriedades do coletor

A tabela abaixo lista as propriedades compatíveis com uma fonte delta. Você pode editar essas propriedades na guia Configurações.

Nome Descrição Obrigatório Valores permitidos Propriedade do script do Fluxo de Dados
Formatar O formato deve ser delta sim delta format
Sistema de arquivos O sistema de contêiner/arquivos do delta lake sim String fileSystem
Caminho da pasta O diretório do lago delta sim String folderPath
Tipo de compactação O tipo de compactação da tabela delta não bzip2
gzip
deflate
ZipDeflate
snappy
lz4
TarGZip
tar
compressionType
Nível de Compactação Escolha se a compactação será concluída o mais rapidamente possível ou se o arquivo resultante deve ter sua compactação otimizada. obrigatório se compressedType for especificado. Optimal ou Fastest compressionLevel
Vacuum Exclui arquivos mais antigos do que a duração especificada que não é mais relevante para a versão atual da tabela. Quando um valor de 0 ou menos é especificado, a operação de vácuo não é executada. sim Integer vácuo
Ação tabela Informa ao ADF o que fazer com a tabela Delta de destino no coletor. Você pode deixá-lo como está e acrescentar novas linhas, substituir a definição de tabela e os dados existentes com novos metadados e dados, ou manter a estrutura de tabela existente, mas primeiro truncar todas as linhas e, em seguida, inserir as novas linhas. não Nenhum, truncado, substituir deltaTruncate, substituir
Método Update Ao selecionar "Permitir inserção" sozinho ou gravar em uma nova tabela delta, o destino recebe todas as linhas de entrada, independentemente das políticas de linha definidas. Se seus dados contiverem linhas de outras políticas de linha, elas precisarão ser excluídas usando uma transformação de filtro anterior.

Quando todos os métodos de atualização são selecionados, uma mesclagem é executada, na qual as linhas são inseridas/excluídas/upserted/atualizadas de acordo com o conjunto de políticas de linha usando uma transformação de Alterar Linha.
sim true ou false insertable
deletable
upsertable
Pode ser atualizado
Gravação otimizada Obtenha uma taxa de transferência mais alta para a operação de gravação por meio da otimização do modo aleatório interno em executores do Spark. Como resultado, você pode notar menos partições e arquivos de tamanho maior não true ou false optimizedWrite: true
Compactar automaticamente Após a conclusão de qualquer operação de gravação, o Spark executará automaticamente o OPTIMIZE comando para reorganizar os dados, resultando em mais partições, se necessário, para melhor leitura do desempenho no futuro não true ou false autoCompact: true

Exemplo de script do coletor Delta

O script de fluxo de dados associado é:

moviesAltered sink(
          input(movieId as integer,
                title as string
            ),
           mapColumn(
                movieId,
                title
            ),
           insertable: true,
           updateable: true,
           deletable: true,
           upsertable: false,
           keys: ['movieId'],
            store: 'local',
           format: 'delta',
           vacuum: 180,
           folderPath: $tempPath + '/delta'
           ) ~> movieDB

Coletor delta com remoção de partições

Com essa opção no método de atualização acima (ou seja, atualizar/executar upsert/excluir), você pode limitar o número de partições inspecionadas. Somente as partições que satisfazem essa condição são obtidas do armazenamento de destino. Você pode especificar um conjunto fixo de valores que uma coluna de partição pode assumir.

Screenshot of partition pruning options are available to limit the inspection.

Exemplo de script do coletor delta com remoção de partições

Um script de exemplo é fornecido a seguir.

DerivedColumn1 sink( 
      input(movieId as integer,
            title as string
           ), 
      allowSchemaDrift: true,
      validateSchema: false,
      format: 'delta',
      container: 'deltaContainer',
      folderPath: 'deltaPath',
      mergeSchema: false,
      autoCompact: false,
      optimizedWrite: false,
      vacuum: 0,
      deletable:false,
      insertable:true,
      updateable:true,
      upsertable:false,
      keys:['movieId'],
      pruneCondition:['part_col' -> ([5, 8])],
      skipDuplicateMapInputs: true,
      skipDuplicateMapOutputs: true) ~> sink2
 

O Delta lerá apenas 2 partições em que part_col == 5 e 8 do repositório delta de destino, em vez de todas as partições. part_col é uma coluna pela qual os dados delta de destino são particionados. Ela não precisa estar presente nos dados de origem.

Opções de otimização do coletor delta

Na guia Configurações, você encontra mais três opções para otimizar a transformação do coletor delta.

  • Quando a opção Mesclar esquema está habilitada, ela permite a evolução do esquema, ou seja, todas as colunas presentes no fluxo de entrada atual, mas não na tabela Delta de destino, são automaticamente adicionadas ao seu esquema. Essa opção tem suporte em todos os métodos de atualização.

  • Quando a Compactação automática está habilitada, após uma gravação individual, a transformação verifica se os arquivos podem ser compactados e executa um trabalho de otimização rápida (com tamanhos de arquivo de 128 MB em vez de 1 GB) para compactar ainda mais os arquivos para partições que têm o maior número de arquivos pequenos. A compactação automática ajuda a unir um grande número de arquivos pequenos em um número menor de arquivos grandes. A compactação automática só entra em vigor quando há pelo menos 50 arquivos. Depois que uma operação de compactação é executada, ela cria uma versão da tabela e grava um novo arquivo contendo os dados de vários arquivos anteriores em um formulário compactado.

  • Quando a gravação Otimizar está habilitada, a transformação do coletor otimiza dinamicamente os tamanhos de partição com base nos dados reais, tentando gravar arquivos de 128 MB para cada partição de tabela. Esse é um tamanho aproximado, e pode variar dependendo das características do conjunto de dados. As gravações otimizadas aprimoram a eficiência geral das gravações e leituras posteriores. Ela organiza as partições de forma que o desempenho das leituras subsequentes melhore.

Dica

O processo de gravação otimizado reduzirá o trabalho de ETL geral porque o Coletor emitirá o comando Spark Delta Lake Optimize depois que os dados forem processados. É recomendável usar a Gravação Otimizada com moderação. Por exemplo, se você tiver um pipeline de dados por hora, execute um fluxo de dados com Gravação Otimizada diariamente.

Limitações conhecidas

Ao gravar em um coletor delta, há uma limitação conhecida em que o número de linhas gravadas não aparecerá na saída de monitoramento.