Tutorial do lakehouse: preparar e transformar dados no lakehouse
Neste tutorial, você usará notebooks com o runtime do Spark para transformar e preparar os dados brutos no lakehouse.
Pré-requisitos
Se você não tiver um lakehouse que contenha dados, você deverá:
Preparar dados
Nas etapas anteriores do tutorial, temos dados brutos ingeridos da origem para a seção Arquivos do lakehouse. Agora você pode transformar esses dados e prepará-los para criar tabelas delta.
Baixe os notebooks na pasta Código-fonte do Tutorial do Lakehouse.
No comutador localizado na parte inferior esquerda da tela, selecione Engenharia de Dados.
Selecione Importar notebook na seção Novo, na parte superior da página de aterrissagem.
Selecione Carregar no painel Importar status que se abre no lado direito da tela.
Selecione todos os notebooks que foram baixados na primeira etapa desta seção.
Selecione Abrir. Uma notificação que indica o status da importação aparece no canto superior direito da janela do navegador.
Depois que a importação for bem-sucedida, acesse a exibição de itens do espaço de trabalho e veja os notebooks recém-importados. Selecione o lakehouse wwilakehouse para abri-lo.
Depois que o lakehouse wwilakehouse for aberto, selecione Abrir notebook>Notebook existente no menu de navegação superior.
Na lista de notebooks existentes, selecione o notebook 01 – Criar Tabelas Delta e selecione Abrir.
No notebook aberto no Explorer do lakehouse, você verá que o notebook já está vinculado ao seu lakehouse aberto.
Observação
O Fabric fornece o recurso de ordem V para gravar arquivos Delta lake otimizados. A ordem V geralmente melhora a compactação em três a quatro vezes e a aceleração do desempenho em até 10 vezes em relação aos arquivos Delta Lake que não são otimizados. O Spark no Fabric otimiza dinamicamente as partições ao gerar arquivos com um tamanho padrão de 128 MB. O tamanho do arquivo de destino pode ser alterado por requisitos de carga de trabalho usando configurações.
Com o recurso de otimizar gravação, o mecanismo do Apache Spark reduz o número de arquivos gravados e visa aumentar o tamanho do arquivo individual dos dados gravados.
Antes de gravar dados como tabelas Delta lake na seção Tabelas do lakehouse, use dois recursos do Fabric (ordem V e Otimizar Gravação) para otimizar a gravação de dados e melhorar o desempenho da leitura. Para habilitar esses recursos em sua sessão, defina essas configurações na primeira célula do notebook.
Para iniciar o notebook e executar todas as células em sequência, selecione Executar tudo na faixa de opções superior (em Início). Ou, para executar apenas o código de uma célula específica, selecione o ícone Executar que aparece à esquerda da célula ao passar o mouse ou pressione SHIFT + ENTER no teclado enquanto o controle estiver na célula.
Ao executar uma célula, você não precisou especificar o pool do Spark subjacente ou os detalhes do cluster porque o Fabric os fornece por meio do Pool Dinâmico. Todo workspace do Fabric vem com um pool Spark padrão, chamado Pool Dinâmico. Isso significa que, ao criar notebooks, você não precisa se preocupar em especificar nenhuma configuração do Spark ou detalhes do cluster. Ao executar o primeiro comando do notebook, o pool dinâmico entra em funcionamento em alguns segundos. E a sessão do Spark é estabelecida e começa a executar o código. A execução subsequente do código é quase instantânea nesse notebook enquanto a sessão do Spark estiver ativa.
Em seguida, leia os dados brutos da seção Arquivos do lakehouse e adicione mais colunas para diferentes partes da data como parte da transformação. Por fim, use a API partition By Spark para particionar os dados antes de gravá-los no formato tabela delta com base nas colunas de partes de dados recém-criadas (Ano e Trimestre).
from pyspark.sql.functions import col, year, month, quarter table_name = 'fact_sale' df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full') df = df.withColumn('Year', year(col("InvoiceDateKey"))) df = df.withColumn('Quarter', quarter(col("InvoiceDateKey"))) df = df.withColumn('Month', month(col("InvoiceDateKey"))) df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
Depois que as tabelas de fatos forem carregadas, você poderá prosseguir com o carregamento de dados para o restante das dimensões. A célula a seguir cria uma função para ler dados brutos da seção Arquivos do lakehouse para cada um dos nomes de tabela passados como parâmetro. Em seguida, criará uma lista de tabelas de dimensão. Por fim, percorrerá a lista de tabelas e criará uma tabela Delta para cada nome de tabela lido do parâmetro de entrada. Observe que o script removerá a coluna denominada
Photo
neste exemplo, pois ela não é utilizada.from pyspark.sql.types import * def loadFullDataFromSource(table_name): df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name) df = df.drop("Photo") df.write.mode("overwrite").format("delta").save("Tables/" + table_name) full_tables = [ 'dimension_city', 'dimension_date', 'dimension_employee', 'dimension_stock_item' ] for table in full_tables: loadFullDataFromSource(table)
Para validar as tabelas criadas, clique com o botão direito do mouse e selecione Atualizar no lakehouse wwilakehouse. As tabelas são exibidas.
Vá para a exibição de itens do workspace novamente e selecione o lakehouse wwilakehouse para abri-lo.
Agora, abra o segundo notebook. Na exibição do lakehouse, selecione Abrir notebook>Notebook existente na faixa de opções.
Na lista de notebooks existentes, selecione o notebook 02 – Transformação de Dados – Negócios para abri-lo.
No notebook aberto no Explorer do lakehouse, você verá que o notebook já está vinculado ao seu lakehouse aberto.
Uma organização pode ter engenheiros de dados trabalhando com Scala/Python e outros engenheiros de dados trabalhando com SQL (Spark SQL ou T-SQL), todos trabalhando na mesma cópia dos dados. O Fabric possibilita que esses diferentes grupos, com experiências e preferências variadas, trabalhem e colaborem. As duas abordagens diferentes transformam e geram agregações de negócios. Você pode escolher a mais adequada para você ou misturar e combinar essas abordagens de acordo com sua preferência, sem comprometer o desempenho:
Abordagem nº 1: use o PySpark para unir e agregar dados para gerar agregados de negócios. Essa abordagem é preferível para alguém com experiência em programação (Python ou PySpark).
Abordagem nº 2: use o Spark SQL para unir e agregar dados para gerar agregações de negócios. Essa abordagem é preferível para alguém com experiência em SQL que está fazendo a transição para o Spark.
Abordagem nº 1 (sale_by_date_city): use o PySpark para unir e agregar dados para gerar agregados de negócios. Com o código a seguir, crie três quadros de dados diferentes do Spark, cada um fazendo referência a uma tabela Delta existente. Em seguida, junte essas tabelas usando os dataframes, faça o agrupamento pela geração de agregação, renomeie algumas colunas e, por fim, escreva-as como uma tabela Delta na seção Tabelas do lakehouse para persistir com os dados.
Nessa célula, crie três dataframes Spark diferentes, cada um fazendo referência a uma tabela delta existente.
df_fact_sale = spark.read.table("wwilakehouse.fact_sale") df_dimension_date = spark.read.table("wwilakehouse.dimension_date") df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
Nessa célula, integre essas tabelas usando os dataframes criados anteriormente, faça o agrupamento por geração de agregação, renomeie algumas colunas e, por fim, escreva-as como uma tabela Delta na seção Tabelas do lakehouse.
sale_by_date_city = df_fact_sale.alias("sale") \ .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \ .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \ .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\ .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\ .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\ .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\ .withColumnRenamed("sum(Profit)", "SumOfProfit")\ .orderBy("date.Date", "city.StateProvince", "city.City") sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
Abordagem nº 2 (sale_by_date_employee): use o Spark SQL para unir e agregar dados para gerar agregados de negócios. Com o código a seguir, crie uma exibição temporária do Spark unindo três tabelas, faça agrupamento por para gerar agregação e renomeie algumas colunas. Por fim, leia a exibição temporária do Spark e escreva-a como uma tabela Delta na seção Tabelas do lakehouse para persistir com os dados.
Nessa célula, crie uma exibição temporária do Spark unindo três tabelas, faça agrupamento por para gerar agregação e renomeie algumas colunas.
%%sql CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee AS SELECT DD.Date, DD.CalendarMonthLabel , DD.Day, DD.ShortMonth Month, CalendarYear Year ,DE.PreferredName, DE.Employee ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax ,SUM(FS.TaxAmount) SumOfTaxAmount ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax ,SUM(Profit) SumOfProfit FROM wwilakehouse.fact_sale FS INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
Nessa célula, leia a exibição temporária do Spark criada na célula anterior e, por fim, grave-a como uma tabela Delta na seção Tabelas do lakehouse.
sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee") sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
Para validar as tabelas criadas, clique com o botão direito do mouse e selecione Atualizar no lakehouse wwilakehouse. As tabelas de agregação são exibidas.
Ambas as abordagens produzem um resultado semelhante. Para minimizar a necessidade de aprender uma nova tecnologia ou comprometer o desempenho, escolha a melhor abordagem para a sua preferência e background.
Você notará que está gravando dados como arquivos Delta lake. O recurso de descoberta e registro automático de tabelas do Fabric coleta e registra essas tabelas no metastore. Você não precisa chamar explicitamente as instruções de CREATE TABLE
para criar tabelas a serem usadas com o SQL.