Condividi tramite


Esercitazione su Lakehouse: Preparare e trasformare i dati nella lakehouse

In questa esercitazione si usano notebook con il runtime di Spark per trasformare e preparare i dati non elaborati in lakehouse.

Prerequisiti

Se non si dispone di una lakehouse che contiene dati, è necessario:

Preparazione dei dati

Nei passaggi precedenti dell'esercitazione sono stati inseriti dati non elaborati dall'origine alla sezione File della lakehouse. È ora possibile trasformare i dati e prepararli per la creazione di tabelle Delta.

  1. Scaricare i notebook dalla cartella Lakehouse Tutorial Source Code .

  2. Dal commutatore in basso a sinistra della schermata selezionare Ingegneria dei dati.

    Screenshot che mostra dove trovare il commutatore e selezionare Ingegneria dei dati.

  3. Selezionare Importa notebook nella sezione Nuovo nella parte superiore della pagina di destinazione.

  4. Selezionare Carica nel riquadro Importa stato che si apre sul lato destro della schermata.

  5. Selezionare tutti i notebook scaricati nel primo passaggio di questa sezione.

    Screenshot che mostra dove trovare i notebook scaricati e il pulsante Apri.

  6. Selezionare Apri. Viene visualizzata una notifica che indica lo stato dell'importazione nell'angolo superiore destro della finestra del browser.

  7. Al termine dell'importazione, passare alla visualizzazione elementi dell'area di lavoro e visualizzare i notebook appena importati. Selezionare wwilakehouse lakehouse per aprirlo.

    Screenshot che mostra l'elenco dei notebook importati e la posizione in cui selezionare il lakehouse.

  8. Una volta aperto il lakehouse wwilakehouse, selezionare Apri notebook> esistente dal menu di spostamento in alto.

    Screenshot che mostra l'elenco dei notebook importati correttamente.

  9. Nell'elenco dei notebook esistenti selezionare il notebook 01 - Crea tabelle delta e selezionare Apri.

  10. Nel notebook aperto in Lakehouse Explorer si noterà che il notebook è già collegato alla lakehouse aperta.

    Nota

    Fabric offre la funzionalità dell'ordine V per scrivere file Delta Lake ottimizzati. L'ordine V spesso migliora la compressione di tre o quattro volte e fino a 10 volte l'accelerazione delle prestazioni sui file Delta Lake non ottimizzati. Spark in Fabric ottimizza in modo dinamico le partizioni durante la generazione di file con dimensioni predefinite di 128 MB. Le dimensioni del file di destinazione possono essere modificate in base ai requisiti del carico di lavoro usando le configurazioni.

    Con la funzionalità di scrittura ottimizzata, il motore Apache Spark riduce il numero di file scritti e mira ad aumentare le dimensioni dei singoli file dei dati scritti.

  11. Prima di scrivere dati come tabelle Delta Lake nella sezione Tabelle del lakehouse, si usano due funzionalità di Infrastruttura (ordine V e Ottimizzazione scrittura) per la scrittura ottimizzata dei dati e per migliorare le prestazioni di lettura. Per abilitare queste funzionalità nella sessione, impostare queste configurazioni nella prima cella del notebook.

    Per avviare il notebook ed eseguire tutte le celle in sequenza, selezionare Esegui tutto sulla barra multifunzione superiore (sotto Home). In alternativa, per eseguire codice solo da una cella specifica, selezionare l'icona Esegui visualizzata a sinistra della cella al passaggio del mouse oppure premere MAIUSC + INVIO sulla tastiera mentre il controllo si trova nella cella.

    Screenshot di una schermata di configurazione della sessione Spark, inclusa una cella di codice e l'icona Esegui.

    Quando si esegue una cella, non è necessario specificare i dettagli del pool o del cluster Spark sottostanti perché Fabric li fornisce tramite il pool live. Ogni area di lavoro infrastruttura include un pool di Spark predefinito, denominato Pool live. Ciò significa che quando si creano notebook, non è necessario preoccuparsi di specificare configurazioni Spark o dettagli del cluster. Quando si esegue il primo comando del notebook, il pool live è attivo e in esecuzione in pochi secondi. La sessione Spark viene stabilita e inizia a eseguire il codice. L'esecuzione successiva del codice è quasi istantanea in questo notebook mentre la sessione Spark è attiva.

  12. Successivamente, leggere i dati non elaborati dalla sezione File del lakehouse e aggiungere altre colonne per parti di data diverse come parte della trasformazione. Infine, si usa l'API Partition By Spark per partizionare i dati prima di scriverli come formato di tabella Delta in base alle colonne della parte di dati appena create (Anno e Trimestre).

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  13. Dopo il caricamento delle tabelle dei fatti, è possibile passare al caricamento dei dati per il resto delle dimensioni. La cella seguente crea una funzione per leggere i dati non elaborati dalla sezione Files della lakehouse per ognuno dei nomi di tabella passati come parametro. Crea quindi un elenco di tabelle delle dimensioni. Infine, scorre l'elenco delle tabelle e crea una tabella Delta per ogni nome di tabella letto dal parametro di input. Si noti che lo script elimina la colonna denominata Photo in questo esempio perché la colonna non viene usata.

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  14. Per convalidare le tabelle create, fare clic con il pulsante destro del mouse e selezionare Aggiorna nella wwilakehouse lakehouse . Vengono visualizzate le tabelle.

    Screenshot che mostra dove trovare le tabelle create in Lakehouse Explorer.

  15. Passare di nuovo alla visualizzazione degli elementi dell'area di lavoro e selezionare la wwilakehouse lakehouse per aprirla.

  16. Aprire ora il secondo notebook. Nella visualizzazione lakehouse selezionare Apri notebook> esistente dalla barra multifunzione.

  17. Nell'elenco dei notebook esistenti selezionare il notebook 02 - Trasformazione dati - Business per aprirlo.

    Screenshot del menu Apri blocco appunti esistente, che mostra dove selezionare il notebook.

  18. Nel notebook aperto in Lakehouse Explorer si noterà che il notebook è già collegato alla lakehouse aperta.

  19. Un'organizzazione potrebbe avere data engineer che lavorano con Scala/Python e altri data engineer che usano SQL (Spark SQL o T-SQL), tutti lavorando alla stessa copia dei dati. L'infrastruttura consente a questi diversi gruppi, con esperienze e preferenze diverse, di lavorare e collaborare. I due diversi approcci trasformano e generano aggregazioni aziendali. È possibile scegliere quello adatto per te o combinare questi approcci in base alle tue preferenze senza compromettere le prestazioni:

    • Approccio 1 : usare PySpark per unire e aggregare i dati per generare aggregazioni aziendali. Questo approccio è preferibile a un utente con un background di programmazione (Python o PySpark).

    • Approccio 2 : usare Spark SQL per unire e aggregare i dati per generare aggregazioni aziendali. Questo approccio è preferibile a un utente con background SQL, passando a Spark.

  20. Approccio 1 (sale_by_date_city): usare PySpark per unire e aggregare i dati per generare aggregazioni aziendali. Con il codice seguente si creano tre dataframe Spark diversi, ognuno dei quali fa riferimento a una tabella Delta esistente. Creare quindi un join di queste tabelle usando i dataframe, eseguire il raggruppamento per generare aggregazioni, rinominare alcune colonne e infine scriverla come tabella Delta nella sezione Tabelle del lakehouse per rendere persistenti i dati.

    In questa cella vengono creati tre dataframe Spark diversi, ognuno dei quali fa riferimento a una tabella Delta esistente.

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    In questa cella si uniscono queste tabelle usando i dataframe creati in precedenza, eseguire il raggruppamento per generare aggregazioni, rinominare alcune delle colonne e infine scriverla come tabella Delta nella sezione Tabelle del lakehouse.

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  21. Approccio 2 (sale_by_date_employee): usare Spark SQL per unire e aggregare i dati per generare aggregazioni aziendali. Con il codice seguente, si crea una vista Spark temporanea unendo tre tabelle, si esegue il raggruppamento per generare l'aggregazione e si rinominano alcune delle colonne. Infine, si legge dalla vista Spark temporanea e infine si scrive come tabella Delta nella sezione Tabelle del lakehouse per rendere persistenti i dati.

    In questa cella viene creata una vista Spark temporanea unendo tre tabelle, eseguendo il raggruppamento per generare l'aggregazione e rinominare alcune delle colonne.

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    In questa cella si legge dalla vista Spark temporanea creata nella cella precedente e infine la si scrive come tabella Delta nella sezione Tabelle della lakehouse.

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  22. Per convalidare le tabelle create, fare clic con il pulsante destro del mouse e selezionare Aggiorna nella wwilakehouse lakehouse . Vengono visualizzate le tabelle di aggregazione.

    Screenshot di Lakehouse Explorer che mostra dove vengono visualizzate le nuove tabelle.

I due approcci producono un risultato simile. Per ridurre al minimo la necessità di apprendere una nuova tecnologia o compromettere le prestazioni, scegliere l'approccio più adatto al proprio background e preferenza.

Si potrebbe notare che si stanno scrivendo dati come file Delta Lake. La funzionalità di individuazione e registrazione automatica delle tabelle di Fabric li preleva e li registra nel metastore. Non è necessario chiamare CREATE TABLE in modo esplicito le istruzioni per creare tabelle da usare con SQL.

Passaggio successivo