Övning: Integrera SQL- och Spark-pooler i Azure Synapse Analytics

Slutförd

I följande övning utforskar vi integreringen av SQL- och Apache Spark-pooler i Azure Synapse Analytics.

Integrera SQL- och Apache Spark-pooler i Azure Synapse Analytics

Du vill skriva till en dedikerad SQL-pool när du har utfört datateknikuppgifter i Spark och sedan referera till SQL-pooldata som en källa för anslutning till Apache Spark DataFrames som innehåller data från andra filer.

Du bestämmer dig för att använda Azure Synapse Apache Spark till Synapse SQL-anslutningsappen för att effektivt överföra data mellan Spark-pooler och SQL-pooler i Azure Synapse.

Överföring av data mellan Apache Spark-pooler och SQL-pooler kan göras med hjälp av JavaDataBaseConnectivity (JDBC). Men med tanke på två distribuerade system som Apache Spark och SQL-pooler tenderar JDBC att vara en flaskhals med seriell dataöverföring.

Azure Synapse Apache Spark-poolen till Synapse SQL-anslutningsappen är en implementering av datakällan för Apache Spark. Den använder Azure Data Lake Storage Gen2 och PolyBase i SQL-pooler för att effektivt överföra data mellan Spark-klustret och Synapse SQL-instansen.

  1. Om vi vill använda Apache Spark-poolen till Synapse SQL Connector (sqlanalytics) är ett alternativ att skapa en tillfällig vy över data i DataFrame. Kör koden nedan i en ny cell för att skapa en vy med namnet top_purchases:

    # Create a temporary view for top purchases 
    topPurchases.createOrReplaceTempView("top_purchases")
    

    Vi skapade en ny tillfällig vy från dataramen topPurchases som vi skapade tidigare och som innehåller de utplattade JSON-användarköpsdata.

  2. Vi måste köra kod som använder Apache Spark-poolen till Synapse SQL-anslutningsappen i Scala. För att göra det lägger vi till magin %%spark i cellen. Kör koden nedan i en ny cell för att läsa från top_purchases vyn:

    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df = spark.sqlContext.sql("select * from top_purchases")
    df.write.sqlanalytics("SQLPool01.wwi.TopPurchases", Constants.INTERNAL)
    

    Kommentar

    Cellen kan ta över en minut att köra. Om du har kört det här kommandot tidigare får du ett felmeddelande om att "Det finns redan och objektet heter..." eftersom tabellen redan finns.

    När cellen har körts tar vi en titt på listan över SQL-pooltabeller för att kontrollera att tabellen har skapats åt oss.

  3. Lämna anteckningsboken öppen och navigera sedan till datahubben (om den inte redan är markerad).

    Datahubben är markerad

  4. Välj fliken Arbetsyta(1), expandera SQL-poolen, välj ellipserna (...) i Tabeller (2) och välj Uppdatera (3). wwi.TopPurchases Expandera tabellen och kolumnerna (4).

    Tabellen visas.

    Som du ser wwi.TopPurchases skapades tabellen automatiskt åt oss, baserat på det härledda schemat för Apache Spark DataFrame. Apache Spark-poolen till Synapse SQL-anslutningsappen var ansvarig för att skapa tabellen och effektivt läsa in data i den.

  5. Gå tillbaka till notebook-filen och kör koden nedan i en ny cell för att läsa försäljningsdata från alla Parquet-filer som finns i sale-small/Year=2019/Quarter=Q4/Month=12/ mappen:

    dfsales = spark.read.load('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet')
    display(dfsales.limit(10))
    

    Kommentar

    Det kan ta över 3 minuter innan cellen körs.

    Variabeln datalake som vi skapade i den första cellen används här som en del av filsökvägen.

    Cellresultatet visas.

    Jämför filsökvägen i cellen ovan med filsökvägen i den första cellen. Här använder vi en relativ sökväg för att läsa in alla försäljningsdata för december 2019 från Parquet-filerna som finns i sale-small, jämfört med bara den 31 december 2010.

    Nu ska vi läsa in TopSales data från SQL-pooltabellen som vi skapade tidigare i en ny Apache Spark DataFrame och sedan ansluta dem till den nya dfsales DataFrame. För att göra det måste vi återigen använda magin %%spark i en ny cell eftersom vi använder Apache Spark-poolen till Synapse SQL-anslutningsappen för att hämta data från SQL-poolen. Sedan måste vi lägga till DataFrame-innehållet i en ny tillfällig vy så att vi kan komma åt data från Python.

  6. Kör koden nedan i en ny cell för att läsa från SQL-pooltabellen TopSales och spara den i en tillfällig vy:

    %%spark
    // Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
    val df2 = spark.read.sqlanalytics("SQLPool01.wwi.TopPurchases")
    df2.createTempView("top_purchases_sql")
    
    df2.head(10)
    

    Cellen och dess utdata visas enligt beskrivningen.

    Cellens språk är inställt på Scala med hjälp av magin %%spark(1) överst i cellen. Vi deklarerade en ny variabel med namnet df2 som en ny DataFrame som skapats av spark.read.sqlanalytics metoden, som läser från TopPurchases tabellen (2) i SQL-poolen. Sedan fyllde vi i en ny tillfällig vy med namnet top_purchases_sql(3). Slutligen visade vi de första 10 posterna med df2.head(10)) raden (4). Cellutmatningen visar DataFrame-värdena (5).

  7. Kör koden nedan i en ny cell för att skapa en ny DataFrame i Python från den top_purchases_sql tillfälliga vyn och visa sedan de första 10 resultaten:

    dfTopPurchasesFromSql = sqlContext.table("top_purchases_sql")
    
    display(dfTopPurchasesFromSql.limit(10))
    

    DataFrame-koden och utdata visas.

  8. Kör koden nedan i en ny cell för att koppla data från Parquet-försäljningsfilerna TopPurchases och SQL-poolen:

    inner_join = dfsales.join(dfTopPurchasesFromSql,
        (dfsales.CustomerId == dfTopPurchasesFromSql.visitorId) & (dfsales.ProductId == dfTopPurchasesFromSql.productId))
    
    inner_join_agg = (inner_join.select("CustomerId","TotalAmount","Quantity","itemsPurchasedLast12Months","top_purchases_sql.productId")
        .groupBy(["CustomerId","top_purchases_sql.productId"])
        .agg(
            sum("TotalAmount").alias("TotalAmountDecember"),
            sum("Quantity").alias("TotalQuantityDecember"),
            sum("itemsPurchasedLast12Months").alias("TotalItemsPurchasedLast12Months"))
        .orderBy("CustomerId") )
    
    display(inner_join_agg.limit(100))
    

    I frågan anslöt vi till dfsales dataramarna och dfTopPurchasesFromSql och matchade på CustomerId och ProductId. I denna koppling kombinerades TopPurchases SQL-poolens tabelldata med Parquet-data för försäljning i december 2019 (1).

    Vi grupperade efter fälten CustomerId och ProductId . ProductId Eftersom fältnamnet är tvetydigt (det finns i båda DataFrames) var vi tvungna att fullständigt kvalificera ProductId namnet för att referera till det i TopPurchases DataFrame (2).

    Sedan skapade vi ett aggregat som summerade det totala beloppet som spenderades på varje produkt i december, det totala antalet produktartiklar i december och det totala antalet produktartiklar som köpts under de senaste 12 månaderna (3).

    Slutligen visade vi anslutna och aggregerade data i en tabellvy.

    Välj kolumnrubrikerna i tabellvyn för att sortera resultatuppsättningen.

    Cellinnehållet och utdata visas.