Compartilhar via


Padrão para acumular dados de forma incremental com o Dataflow Gen2

Importante

Esse é um padrão para acumular dados de forma incremental com o Dataflow Gen2. Isso não é o mesmo que a atualização incremental. A atualização incremental é um recurso que está em desenvolvimento no momento. Esse recurso é uma das ideias mais votadas em nosso site de ideias. Você pode votar neste recurso no site Ideias do Fabric.

Este tutorial leva 15 minutos e descreve como acumular dados de forma incremental em um lakehouse usando o Dataflow Gen2.

O acúmulo incremental de dados em um destino de dados requer uma técnica para carregar apenas dados novos ou atualizados em seu destino de dados. Essa técnica pode ser feita usando uma consulta para filtrar os dados com base no destino de dados. Este tutorial mostra como criar um fluxo de dados para carregar dados de uma fonte OData em um lakehouse e como adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino dos dados.

As etapas de alto nível neste tutorial são as seguintes:

  • Crie um fluxo de dados para carregar dados de uma fonte OData em um lakehouse.
  • Adicione uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados.
  • (Opcional) recarregar dados usando notebooks e pipelines.

Pré-requisitos

Você deve ter um workspace habilitado para o Microsoft Fabric. Se você ainda não tiver um, consulte Criar um workspace. Além disso, o tutorial pressupõe que você esteja usando a exibição de diagrama no Fluxo de Dados Gen2. Para verificar se você está usando a exibição de diagrama, na faixa de opções superior, vá para Exibir e verifique se a Exibição de diagrama está selecionada.

Criar um fluxo de dados para carregar dados de uma fonte OData em um lakehouse

Nesta seção, você criará um fluxo de dados para carregar dados de uma fonte OData em um lakehouse.

  1. Crie um novo lakehouse em seu workspace.

    Captura de tela mostrando a caixa de diálogo de criação de um lakehouse.

  2. Crie um novo Fluxo de Dados Gen2 em seu workspace.

    Captura de tela mostrando a lista suspensa de criação de fluxo de dados.

  3. Adicione uma nova fonte ao fluxo de dados. Selecione a origem OData e insira a seguinte URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Captura de tela mostrando a caixa de diálogo obter dados.

    Captura de tela mostrando o conector OData.

    Captura de tela mostrando as configurações de OData.

  4. Selecione a tabela Pedidos e selecione Avançar.

    Captura de tela mostrando a caixa de diálogo da tabela selecionar pedidos.

  5. Selecione as seguintes colunas para manter:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Captura de tela mostrando a função escolher colunas.

    Captura de tela mostrando a tabela escolher ordem das colunas.

  6. Altere o tipo de dados de OrderDate, RequiredDatee ShippedDate para datetime.

    Captura de tela mostrando a função alterar tipo de dados.

  7. Configure o destino de dados para o lakehouse usando as seguintes configurações:

    • Destino dos dados: Lakehouse
    • Lakehouse: selecione a lakehouse que você criou na etapa 1.
    • Nome da nova tabela: Orders
    • Método de atualização: Replace

    Captura de tela mostrando a faixa de opções do lakehouse de destino de dados.

    Captura de tela mostrando a tabela de pedidos do lakehouse de destino de dados.

    Captura de tela mostrando a substituição de configurações do lakehouse de destino de dados.

  8. selecione Avançar e publique o fluxo de dados.

    Captura de tela mostrando a caixa de diálogo publicar fluxo de dados.

Agora você criou um fluxo de dados para carregar dados de uma fonte OData em um lakehouse. Esse fluxo de dados é usado na próxima seção para adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados. Depois disso, você pode usar o fluxo de dados para recarregar dados usando notebooks e pipelines.

Adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados

Esta seção adiciona uma consulta ao fluxo de dados para filtrar os dados com base nos dados no lakehouse de destino. A consulta obtém o OrderID máximo no lakehouse no início da atualização do fluxo de dados e usa o OrderId máximo para obter apenas os pedidos com um OrderId mais alto de para a origem para acrescentar ao seu destino de dados. Isso pressupõe que os pedidos sejam adicionados à origem em ordem crescente de OrderID. Se esse não for o caso, você poderá usar uma coluna diferente para filtrar os dados. Por exemplo, você pode usar a coluna OrderDate para filtrar os dados.

Observação

Os filtros OData são aplicados no Fabric depois que os dados são recebidos da fonte de dados, no entanto, para fontes de banco de dados como o SQL Server, o filtro é aplicado na consulta enviada à fonte de dados de back-end e somente as linhas filtradas são retornadas ao serviço.

  1. Depois que o fluxo de dados for atualizado, reabra o fluxo de dados criado na seção anterior.

    Captura de tela mostrando a caixa de diálogo abrir fluxo de dados.

  2. Crie uma nova consulta chamada IncrementalOrderID e obtenha dados da tabela Pedidos na lakehouse que você criou na seção anterior.

    Captura de tela mostrando a caixa de diálogo obter dados.

    Captura de tela mostrando o conector do lakehouse.

    Captura de tela mostrando a tabela obter pedidos do lakehouse.

    Captura de tela mostrando a função renomear consulta.

    Captura de tela mostrando a consulta renomeada.

  3. Desabilite o preparo dessa consulta.

    Captura de tela mostrando a função desabilitar preparo.

  4. Na visualização de dados, clique com o botão direito do mouse na coluna OrderID e selecione Fazer drill down.

    Captura de tela mostrando a função busca detalhada.

  5. Na faixa de opções, selecione Ferramentas de Lista ->Estatísticas ->Máximo.

    Captura de tela mostrando a função orderid máximo de estatísticas.

Agora você tem uma consulta que retorna o OrderID máximo no lakehouse. Essa consulta é usada para filtrar os dados da fonte OData. A próxima seção adiciona uma consulta ao fluxo de dados para filtrar os dados da fonte OData com base no OrderID máximo no lakehouse.

  1. Voltar à consulta Ordens e adicione uma nova etapa para filtrar os dados. Use as configurações a seguir:

    • Coluna: OrderID
    • Operação: Greater than
    • Valor: parâmetro IncrementalOrderID

    Captura de tela mostrando a função de filtro orderid maior que.

    Captura de tela mostrando as configurações de filtro.

  2. Permita a combinação dos dados da fonte OData e da lakehouse confirmando a seguinte caixa de diálogo:

    Captura de tela mostrando a caixa de diálogo para permitir a combinação de dados.

  3. Atualize o destino de dados para usar as seguintes configurações:

    • Método de atualização: Append

    Captura de tela mostrando a função de configurações de edição de saída.

    Captura de tela mostrando a tabela de pedidos existentes.

    Captura de tela mostrando o apêndice de configurações do lakehouse de destino de dados.

  4. Publique o fluxo de dados.

    Captura de tela mostrando a caixa de diálogo publicar fluxo de dados.

Seu fluxo de dados agora contém uma consulta que filtra os dados da fonte OData com base no OrderID máximo no lakehouse. Isso significa que apenas dados novos ou atualizados são carregados no lakehouse. A próxima seção usa o fluxo de dados para recarregar dados usando notebooks e pipelines.

(Opcional) recarregar dados usando notebooks e pipelines

Opcionalmente, você pode recarregar dados específicos usando notebooks e pipelines. Com o código python personalizado no notebook, você remove os dados antigos do lakehouse. Ao criar um pipeline no qual você primeiro executa o notebook e executa sequencialmente o fluxo de dados, recarrega os dados da fonte OData no lakehouse. Os notebooks dão suporte a vários idiomas, mas este tutorial usa o PySpark. O Pyspark é uma API do Python para Spark e é usado neste tutorial para executar consultas SQL do Spark.

  1. Criar um novo notebook no seu workspace.

    Captura de tela mostrando a caixa de diálogo novo notebook.

  2. Adicione o código PySpark a seguir ao seu notebook:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Execute o notebook para verificar se os dados foram removidos do lakehouse.

  4. Crie um pipeline no seu workspace.

    Captura de tela mostrando a caixa de diálogo novo pipeline.

  5. Adicione uma nova atividade de notebook ao pipeline e selecione o notebook que você criou na etapa anterior.

    Captura de tela mostrando a caixa de diálogo adicionar atividade de notebook.

    Captura de tela mostrando a caixa de diálogo selecionar notebook.

  6. Adicione uma nova atividade de fluxo de dados ao pipeline e selecione o fluxo de dados que você criou na seção anterior.

    Captura de tela mostrando a caixa de diálogo adicionar atividade de fluxo de dados.

    Captura de tela mostrando a caixa de diálogo selecionar fluxo de dados.

  7. Vincule a atividade do notebook à atividade de fluxo de dados com um gatilho de êxito.

    Captura de tela mostrando a caixa de diálogo conectar atividades.

  8. Salve e execute o pipeline.

    Captura de tela mostrando a caixa de diálogo executar pipeline.

Agora você tem um pipeline que remove dados antigos do lakehouse e recarrega os dados da fonte OData para o lakehouse. Com essa configuração, você pode recarregar os dados da fonte OData no lakehouse regularmente.