Padrão para acumular dados de forma incremental com o Dataflow Gen2
Importante
Esse é um padrão para acumular dados de forma incremental com o Dataflow Gen2. Isso não é o mesmo que a atualização incremental. A atualização incremental é um recurso que está em desenvolvimento no momento. Esse recurso é uma das ideias mais votadas em nosso site de ideias. Você pode votar neste recurso no site Ideias do Fabric.
Este tutorial leva 15 minutos e descreve como acumular dados de forma incremental em um lakehouse usando o Dataflow Gen2.
O acúmulo incremental de dados em um destino de dados requer uma técnica para carregar apenas dados novos ou atualizados em seu destino de dados. Essa técnica pode ser feita usando uma consulta para filtrar os dados com base no destino de dados. Este tutorial mostra como criar um fluxo de dados para carregar dados de uma fonte OData em um lakehouse e como adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino dos dados.
As etapas de alto nível neste tutorial são as seguintes:
- Crie um fluxo de dados para carregar dados de uma fonte OData em um lakehouse.
- Adicione uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados.
- (Opcional) recarregar dados usando notebooks e pipelines.
Pré-requisitos
Você deve ter um workspace habilitado para o Microsoft Fabric. Se você ainda não tiver um, consulte Criar um workspace. Além disso, o tutorial pressupõe que você esteja usando a exibição de diagrama no Fluxo de Dados Gen2. Para verificar se você está usando a exibição de diagrama, na faixa de opções superior, vá para Exibir e verifique se a Exibição de diagrama está selecionada.
Criar um fluxo de dados para carregar dados de uma fonte OData em um lakehouse
Nesta seção, você criará um fluxo de dados para carregar dados de uma fonte OData em um lakehouse.
Crie um novo lakehouse em seu workspace.
Crie um novo Fluxo de Dados Gen2 em seu workspace.
Adicione uma nova fonte ao fluxo de dados. Selecione a origem OData e insira a seguinte URL:
https://services.OData.org/V4/Northwind/Northwind.svc
Selecione a tabela Pedidos e selecione Avançar.
Selecione as seguintes colunas para manter:
OrderID
CustomerID
EmployeeID
OrderDate
RequiredDate
ShippedDate
ShipVia
Freight
ShipName
ShipAddress
ShipCity
ShipRegion
ShipPostalCode
ShipCountry
Altere o tipo de dados de
OrderDate
,RequiredDate
eShippedDate
paradatetime
.Configure o destino de dados para o lakehouse usando as seguintes configurações:
- Destino dos dados:
Lakehouse
- Lakehouse: selecione a lakehouse que você criou na etapa 1.
- Nome da nova tabela:
Orders
- Método de atualização:
Replace
- Destino dos dados:
selecione Avançar e publique o fluxo de dados.
Agora você criou um fluxo de dados para carregar dados de uma fonte OData em um lakehouse. Esse fluxo de dados é usado na próxima seção para adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados. Depois disso, você pode usar o fluxo de dados para recarregar dados usando notebooks e pipelines.
Adicionar uma consulta ao fluxo de dados para filtrar os dados com base no destino de dados
Esta seção adiciona uma consulta ao fluxo de dados para filtrar os dados com base nos dados no lakehouse de destino. A consulta obtém o OrderID
máximo no lakehouse no início da atualização do fluxo de dados e usa o OrderId máximo para obter apenas os pedidos com um OrderId mais alto de para a origem para acrescentar ao seu destino de dados. Isso pressupõe que os pedidos sejam adicionados à origem em ordem crescente de OrderID
. Se esse não for o caso, você poderá usar uma coluna diferente para filtrar os dados. Por exemplo, você pode usar a coluna OrderDate
para filtrar os dados.
Observação
Os filtros OData são aplicados no Fabric depois que os dados são recebidos da fonte de dados, no entanto, para fontes de banco de dados como o SQL Server, o filtro é aplicado na consulta enviada à fonte de dados de back-end e somente as linhas filtradas são retornadas ao serviço.
Depois que o fluxo de dados for atualizado, reabra o fluxo de dados criado na seção anterior.
Crie uma nova consulta chamada
IncrementalOrderID
e obtenha dados da tabela Pedidos na lakehouse que você criou na seção anterior.Desabilite o preparo dessa consulta.
Na visualização de dados, clique com o botão direito do mouse na coluna
OrderID
e selecione Fazer drill down.Na faixa de opções, selecione Ferramentas de Lista ->Estatísticas ->Máximo.
Agora você tem uma consulta que retorna o OrderID máximo no lakehouse. Essa consulta é usada para filtrar os dados da fonte OData. A próxima seção adiciona uma consulta ao fluxo de dados para filtrar os dados da fonte OData com base no OrderID máximo no lakehouse.
Voltar à consulta Ordens e adicione uma nova etapa para filtrar os dados. Use as configurações a seguir:
- Coluna:
OrderID
- Operação:
Greater than
- Valor: parâmetro
IncrementalOrderID
- Coluna:
Permita a combinação dos dados da fonte OData e da lakehouse confirmando a seguinte caixa de diálogo:
Atualize o destino de dados para usar as seguintes configurações:
- Método de atualização:
Append
- Método de atualização:
Publique o fluxo de dados.
Seu fluxo de dados agora contém uma consulta que filtra os dados da fonte OData com base no OrderID máximo no lakehouse. Isso significa que apenas dados novos ou atualizados são carregados no lakehouse. A próxima seção usa o fluxo de dados para recarregar dados usando notebooks e pipelines.
(Opcional) recarregar dados usando notebooks e pipelines
Opcionalmente, você pode recarregar dados específicos usando notebooks e pipelines. Com o código python personalizado no notebook, você remove os dados antigos do lakehouse. Ao criar um pipeline no qual você primeiro executa o notebook e executa sequencialmente o fluxo de dados, recarrega os dados da fonte OData no lakehouse. Os notebooks dão suporte a vários idiomas, mas este tutorial usa o PySpark. O Pyspark é uma API do Python para Spark e é usado neste tutorial para executar consultas SQL do Spark.
Criar um novo notebook no seu workspace.
Adicione o código PySpark a seguir ao seu notebook:
### Variables LakehouseName = "YOURLAKEHOUSE" TableName = "Orders" ColName = "OrderID" NumberOfOrdersToRemove = "10" ### Remove Old Orders Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect() Reload = Reload[0].ReLoadValue spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
Execute o notebook para verificar se os dados foram removidos do lakehouse.
Crie um pipeline no seu workspace.
Adicione uma nova atividade de notebook ao pipeline e selecione o notebook que você criou na etapa anterior.
Adicione uma nova atividade de fluxo de dados ao pipeline e selecione o fluxo de dados que você criou na seção anterior.
Vincule a atividade do notebook à atividade de fluxo de dados com um gatilho de êxito.
Salve e execute o pipeline.
Agora você tem um pipeline que remove dados antigos do lakehouse e recarrega os dados da fonte OData para o lakehouse. Com essa configuração, você pode recarregar os dados da fonte OData no lakehouse regularmente.
Comentários
https://aka.ms/ContentUserFeedback.
Em breve: Ao longo de 2024, eliminaremos os problemas do GitHub como o mecanismo de comentários para conteúdo e o substituiremos por um novo sistema de comentários. Para obter mais informações, consulteEnviar e exibir comentários de