Partilhar via


Padrão para acumular dados incrementalmente com o Dataflow Gen2

Este tutorial leva 15 minutos e descreve como acumular dados incrementalmente em uma casa de lago usando o Dataflow Gen2.

O acúmulo incremental de dados em um destino de dados requer uma técnica para carregar apenas dados novos ou atualizados em seu destino de dados. Essa técnica pode ser feita usando uma consulta para filtrar os dados com base no destino dos dados. Este tutorial mostra como criar um fluxo de dados para carregar dados de uma fonte OData numa lakehouse e como adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino pretendido.

As etapas de alto nível neste tutorial são as seguintes:

  • Crie um fluxo de dados para carregar dados de uma fonte OData em uma casa de lago.
  • Adicione uma consulta ao fluxo de dados para filtrar os dados com base no destino dos dados.
  • (Opcional) recarregar dados usando notebooks e pipelines.

Pré-requisitos

Você deve ter um espaço de trabalho habilitado para Microsoft Fabric. Se você ainda não tiver um, consulte Criar um espaço de trabalho. Além disso, o tutorial pressupõe que você esteja usando a exibição de diagrama no Dataflow Gen2. Para verificar se está a utilizar a Vista de Diagrama, no friso superior aceda a Vista e certifique-se de que a Vista de Diagrama está selecionada.

Criar um fluxo de dados para carregar dados de uma fonte OData em uma casa de lago

Nesta secção, o utilizador criará um fluxo de dados para carregar dados de uma fonte OData num lakehouse.

  1. Crie uma nova casa do lago no seu espaço de trabalho.

    Captura de ecrã a mostrar a caixa de diálogo de criação de lakehouse.

  2. Crie um novo Dataflow Gen2 em seu espaço de trabalho.

    Captura de ecrã mostrando o menu suspenso Criar Fluxo de Dados.

  3. Adicione uma nova fonte ao fluxo de dados. Selecione a fonte OData e insira o seguinte URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Captura de ecrã mostrando a caixa de diálogo para obter dados.

    Captura de tela mostrando o conector OData.

    Captura de tela mostrando as configurações do OData.

  4. Selecione a tabela Pedidos e selecione Avançar.

    Captura de tela mostrando a caixa de diálogo da tabela de pedidos selecionados.

  5. Selecione as seguintes colunas para manter:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Captura de tela mostrando a função de escolher colunas.

    Captura de tela mostrando a tabela de ordens de escolha de colunas.

  6. Altere o tipo de dados de OrderDate, RequiredDatee ShippedDate para datetime.

    Captura de ecrã a mostrar a função de alteração do tipo de dados.

  7. Configure o destino dos dados para sua casa do lago usando as seguintes configurações:

    • Destino dos dados: Lakehouse
    • Lakehouse: Selecione a lakehouse que você criou na etapa 1.
    • Novo nome da tabela: Orders
    • Método de atualização: Replace

    Imagem de ecrã mostrando a barra de ferramentas lakehouse de destino de dados.

    Captura de ecrã mostrando a tabela de pedidos do lakehouse de destino de dados.

    Captura de tela mostrando a substituição das configurações da casa do lago de destino de dados.

  8. selecione Avançar e publique o fluxo de dados.

    Captura de tela mostrando a caixa de diálogo de fluxo de dados de publicação.

Agora você criou um fluxo de dados para carregar dados de uma fonte OData em uma casa de lago. Esse fluxo de dados é usado na próxima seção para adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino dos dados. Depois disso, pode-se usar o fluxo de dados para recarregar os dados utilizando notebooks e pipelines.

Adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino dos dados

Esta seção adiciona uma consulta ao fluxo de dados para filtrar os dados com base nos dados no lakehouse de destino. A consulta obtém o máximo OrderID na lakehouse no início da atualização do fluxo de dados e usa o OrderId máximo para obter apenas os pedidos com um OrderId mais alto da origem para anexar ao seu destino de dados. Isso pressupõe que as ordens sejam adicionadas à fonte em ordem crescente de OrderID. Se esse não for o caso, você pode usar uma coluna diferente para filtrar os dados. Por exemplo, você pode usar a OrderDate coluna para filtrar os dados.

Nota

Os filtros OData são aplicados no Fabric depois que os dados são recebidos da fonte de dados, no entanto, para fontes de banco de dados como o SQL Server, o filtro é aplicado na consulta enviada à fonte de dados de back-end e apenas as linhas filtradas são retornadas ao serviço.

  1. Depois que o fluxo de dados for atualizado, reabra o fluxo de dados criado na seção anterior.

    Captura de tela mostrando a caixa de diálogo de fluxo de dados aberto.

  2. Crie uma nova consulta com o nome IncrementalOrderID e obtenha dados da tabela Pedidos na casa do lago que você criou na seção anterior.

    Captura de ecrã mostrando a caixa de diálogo para obter dados.

    Captura de ecrã a mostrar o conector lakehouse.

    Captura de ecrã mostrando a tabela get orders lakehouse.

    Captura de ecrã mostrando a função renomear consulta.

    Captura de tela mostrando a consulta renomeada.

  3. Desative o preparo desta consulta.

    Captura de ecrã a mostrar a função de desativação do ambiente de testes.

  4. Na visualização de dados, clique com o botão direito do mouse na coluna OrderID e selecione Detalhar.

    Captura de ecrã mostrando a função de detalhamento.

  5. Na faixa de opções, selecione Ferramentas da Lista ->Estatísticas ->Máximo.

    Captura de ecrã mostrando a função de estatísticas

Agora tens uma consulta que retorna o OrderID máximo no lakehouse. Essa consulta é usada para filtrar os dados da fonte OData. A próxima seção adiciona uma consulta ao fluxo de dados para filtrar os dados da fonte OData com base no OrderID máximo na data lakehouse.

  1. Volte para a consulta Pedidos e adicione uma nova etapa para filtrar os dados. Utilize as seguintes definições:

    • Coluna: OrderID
    • Funcionamento: Greater than
    • Valor: parâmetro IncrementalOrderID

    Captura de tela mostrando a função orderid maior que o filtro.

    Captura de ecrã a mostrar as definições do filtro.

  2. Permita combinar os dados da fonte OData e do lakehouse ao confirmar a seguinte caixa de diálogo:

    Captura de tela mostrando a caixa de diálogo permitir combinação de dados.

  3. Atualize o destino dos dados para usar as seguintes configurações:

    • Método de atualização: Append

    Captura de tela mostrando a função de editar configurações de saída.

    Captura de ecrã a mostrar a tabela de encomendas existente.

    Captura de tela mostrando as configurações do lakehouse de destino de dados anexadas.

  4. Publique o fluxo de dados.

    Captura de tela mostrando a caixa de diálogo de fluxo de dados de publicação.

O seu fluxo de dados agora contém uma consulta que filtra os dados da fonte OData com base no OrderID máximo no lakehouse. Isso significa que apenas dados novos ou atualizados são carregados na casa do lago. A próxima seção utiliza o fluxo de dados para recarregar informações utilizando notebooks e pipelines.

(Opcional) recarregar dados usando notebooks e pipelines

Opcionalmente, você pode recarregar dados específicos usando blocos de anotações e pipelines. Com código Python personalizado no notebook, removem-se os dados antigos do lakehouse. Em seguida, ao criar um pipeline no qual o bloco de anotações é executado primeiro e em seguida o fluxo de dados é executado sequencialmente, recarregam-se os dados da fonte OData no lakehouse. Os notebooks suportam vários idiomas, mas este tutorial usa o PySpark. O Pyspark é uma API Python para o Spark e é usado neste tutorial para executar consultas SQL do Spark.

  1. Crie um novo bloco de notas na sua área de trabalho.

    Captura de ecrã a mostrar a nova caixa de diálogo do bloco de notas.

  2. Adicione o seguinte código PySpark ao seu bloco de notas:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Execute o notebook para verificar se os dados foram removidos do lakehouse.

  4. Crie um novo pipeline em seu espaço de trabalho.

    Captura de ecrã mostrando a nova janela de diálogo do pipeline.

  5. Adicione uma nova atividade de notebook ao pipeline e selecione o notebook criado na etapa anterior.

    Captura de ecrã a mostrar a caixa de diálogo de adicionar uma atividade ao bloco de notas.

    Captura de ecrã que mostra a caixa de diálogo do selecionar bloco de notas.

  6. Adicione uma nova atividade de fluxo de dados ao pipeline e selecione o fluxo de dados criado na seção anterior.

    Captura de ecrã mostrando a caixa de diálogo de adicionar atividade de fluxo de dados.

    Captura de ecrã que mostra a caixa de diálogo para selecionar o fluxo de dados.

  7. Vincule a atividade do bloco de anotações à atividade de fluxo de dados com um gatilho de sucesso.

    Captura de tela mostrando a caixa de diálogo de atividades de conexão.

  8. Salve e execute o pipeline.

    Captura de ecrã que mostra a caixa de diálogo da execução do pipeline.

Agora, tem um pipeline que elimina dados antigos do lakehouse e carrega novamente os dados da fonte OData no lakehouse. Com esta configuração, é possível recarregar os dados da fonte OData no lakehouse regularmente.