Cvičení – integrace poznámkového bloku v rámci kanálů Azure Synapse

Dokončeno

V této lekci vytvoříte poznámkový blok Azure Synapse Spark pro analýzu a transformaci dat načtených mapováním toku dat a uložíte je do datového jezera. Vytvoříte buňku parametru, která přijímá řetězcový parametr, který definuje název složky pro data, která poznámkový blok zapisuje do datového jezera.

Potom tento poznámkový blok přidáte do kanálu Synapse a předáte jedinečné ID spuštění kanálu do parametru poznámkového bloku, abyste mohli později korelovat spuštění kanálu s daty uloženými aktivitou poznámkového bloku.

Nakonec použijete centrum monitorování v synapse Studiu k monitorování spuštění kanálu, získání ID spuštění a vyhledání odpovídajících souborů uložených v datovém jezeře.

Informace o Apache Sparku a poznámkových blocích

Apache Spark je architektura paralelního zpracování, která podporuje zpracování v paměti za účelem zvýšení výkonu analytických aplikací pro velké objemy dat. Apache Spark ve službě Azure Synapse Analytics je jednou z implementací Apache Sparku v cloudu od Microsoftu.

Poznámkový blok Apache Spark v Synapse Studiu je webové rozhraní, které umožňuje vytvářet soubory, které obsahují živý kód, vizualizace a text vyprávění. Poznámkové bloky jsou vhodným místem pro ověřování nápadů a rychlé experimenty, které vám pomohou získat poznatky z dat. Poznámkové bloky se také běžně používají při přípravě dat, vizualizaci dat, strojovém učení a dalších scénářích pro velké objemy dat.

Vytvoření poznámkového bloku Synapse Spark

Předpokládejme, že jste ve službě Synapse Analytics vytvořili mapování toku dat pro zpracování, připojení a import dat profilů uživatelů. Teď chcete najít prvních pět produktů pro každého uživatele podle toho, které produkty jsou preferované a které jsou volbou nejlépe, a v posledních 12 měsících máte nejvíce nákupů. Pak chcete vypočítat prvních pět produktů celkem.

V tomto cvičení vytvoříte poznámkový blok Synapse Spark, který tyto výpočty provede.

  1. Otevřete Synapse Analytics Studio (https://web.azuresynapse.net/) a přejděte do datového centra.

    Položka nabídky Data je zvýrazněná.

  2. Vyberte kartu Propojená (1) a rozbalte primární účet Data Lake Storage (2) pod Azure Data Lake Storage Gen2. Vyberte kontejner wwi-02 (3) a otevřete složku top-products (4). Klikněte pravým tlačítkem na libovolný soubor Parquet (5), vyberte položku nabídky Nový poznámkový blok (6) a pak vyberte Načíst do datového rámce (7). Pokud složku nevidíte, vyberte Refresh.

    Zvýrazní se soubor Parquet a nová možnost poznámkového bloku.

  3. Ujistěte se, že je poznámkový blok připojený k vašemu fondu Sparku.

    Položka nabídky Připojit k fondu Sparku je zvýrazněná.

  4. Nahraďte název souboru Parquet (*.parquet1) a vyberte všechny soubory Parquet ve top-products složce. Například cesta by měla být podobná: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    Název souboru je zvýrazněný.

  5. Výběrem možnosti Spustit vše na panelu nástrojů poznámkového bloku spusťte poznámkový blok.

    Zobrazí se výsledky buněk.

    Poznámka:

    Při prvním spuštění poznámkového bloku ve fondu Sparku vytvoří Synapse novou relaci. Může to trvat přibližně 3 až 5 minut.

    Poznámka:

    Pokud chcete spustit jenom buňku, najeďte myší na buňku a vyberte ikonu Spustit buňku vlevo od buňky, nebo buňku vyberte a pak stiskněte Ctrl+Enter.

  6. Výběrem tlačítka a výběrem položky buňky Kód vytvořte novou buňku pod + ní. Tlačítko + se nachází pod buňkou poznámkového bloku vlevo. Alternativně můžete také rozbalit nabídku + Buňka na panelu nástrojů Poznámkový blok a vybrat položku kód buňky .

    Je zvýrazněná možnost nabídky Přidat kód.

  7. Spuštěním následujícího příkazu v nové buňce naplňte nový datový rámec s názvem topPurchases, vytvořte nové dočasné zobrazení s názvem top_purchasesa zobrazte prvních 100 řádků:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Výstup by měl vypadat zhruba takto:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Spuštěním následujícího příkazu v nové buňce vytvořte nové dočasné zobrazení pomocí SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Poznámka:

    Pro tento dotaz neexistuje žádný výstup.

    Dotaz používá top_purchases dočasné zobrazení jako zdroj a použije metodu row_number() over pro použití čísla řádku pro záznamy pro každého uživatele, kde ItemsPurchasedLast12Months je největší. Klauzule where vyfiltruje výsledky, takže načteme až pět produktů, u kterých jsou obě IsTopProduct hodnoty IsPreferredProduct nastaveny na true. To nám dává prvních pět nejčastěji zakoupených produktů pro každého uživatele, kde jsou tyto produkty také identifikovány jako oblíbené produkty podle jejich profilu uživatele uloženého ve službě Azure Cosmos DB.

  9. Spuštěním následujícího příkazu v nové buňce vytvořte a zobrazte nový datový rámec, který ukládá výsledky dočasného top_5_products zobrazení, které jste vytvořili v předchozí buňce:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Měl by se zobrazit výstup podobný následujícímu, který zobrazuje prvních pět upřednostňovaných produktů pro jednotlivé uživatele:

    Prvních pět upřednostňovaných produktů se zobrazí pro jednotlivé uživatele.

  10. Vypočítejte pět nejlepších produktů celkem na základě těch, které zákazníci preferují a koupili nejvíce. Uděláte to tak, že v nové buňce spustíte následující příkaz:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    V této buňce jsme seskupili prvních pět upřednostňovaných produktů podle ID produktu, seskupili jsme celkový počet zakoupených položek za posledních 12 měsíců, seřadili tuto hodnotu v sestupném pořadí a vrátili prvních pět výsledků. Výstup by měl vypadat přibližně takto:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Vytvoření buňky parametru

Kanály Azure Synapse vyhledá buňku parametrů a považují tuto buňku za výchozí hodnoty parametrů předaných v době provádění. Prováděcí modul přidá novou buňku pod buňku parametrů se vstupními parametry pro přepsání výchozích hodnot. Pokud není buňka parametrů určená, vložená buňka se vloží do horní části poznámkového bloku.

  1. Tento poznámkový blok spustíme z kanálu. Chceme předat parametr, který nastaví runId hodnotu proměnné, která se použije k pojmenování souboru Parquet. V nové buňce spusťte následující příkaz:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    K vygenerování náhodného identifikátoru GUID používáme knihovnu uuid , která je součástí Sparku. Chceme přepsat runId proměnnou parametrem předaným kanálem. K tomu musíme tuto funkci přepnout jako buňku parametru.

  2. Vyberte akce se třemi tečkami (...) v pravém horním rohu buňky (1) a pak vyberte Přepnout buňku parametru (2).

    Položka nabídky je zvýrazněná.

    Po přepnutí této možnosti se v buňce zobrazí značka Parametry .

    Buňka je nakonfigurovaná tak, aby přijímala parametry.

  3. Do nové buňky vložte následující kód, který použije runId proměnnou jako název souboru Parquet v /top5-products/ cestě v primárním účtu data lake. Nahraďte YOUR_DATALAKE_NAME cestu názvem vašeho primárního účtu Data Lake. Chcete-li to najít, posuňte se nahoru na buňku 1 v horní části stránky (1). Zkopírujte účet data Lake Storage z cesty (2). Tuto hodnotu vložte jako náhradu YOUR_DATALAKE_NAME do cesty (3) uvnitř nové buňky a potom spusťte příkaz v buňce.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Cesta se aktualizuje názvem primárního účtu Data Lake.

  4. Ověřte, že byl soubor zapsán do datového jezera. Přejděte do datového centra a vyberte kartu Propojená (1). Rozbalte primární účet data lake storage a pak vyberte kontejner wwi-02 (2). Přejděte do složky top5-products (3). V adresáři by se měla zobrazit složka pro soubor Parquet s identifikátorem GUID jako názvem souboru (4).

    Soubor parquet je zvýrazněný.

    Metoda zápisu Parquet do datového rámce v buňce Poznámkový blok vytvořila tento adresář, protože předtím neexistovala.

Přidání poznámkového bloku do kanálu Synapse

Pokud se chcete vrátit k Tok dat mapování, které jsme popsali na začátku cvičení, předpokládejme, že chcete tento poznámkový blok spustit po spuštění Tok dat jako součást procesu orchestrace. Uděláte to tak, že tento poznámkový blok přidáte do kanálu jako novou aktivitu poznámkového bloku.

  1. Vraťte se do poznámkového bloku. V pravém horním rohu poznámkového bloku vyberte Vlastnosti (1) a zadejte Calculate Top 5 Products název (2).

    Zobrazí se okno vlastností.

  2. V pravém horním rohu poznámkového bloku vyberte Přidat do kanálu (1) a pak vyberte Existující kanál (2).

    Tlačítko Přidat do kanálu je zvýrazněné.

  3. Vyberte data v profilu uživatele do kanálu ASA (1) a pak vyberte Přidat *(2).

    Je vybrán kanál.

  4. Synapse Studio přidá aktivitu poznámkového bloku do kanálu. Změňte uspořádání aktivity poznámkového bloku tak, aby se nachází napravo od aktivity toku dat. Vyberte aktivitu toku dat a přetáhněte zelené pole připojení kanálu aktivity úspěch do aktivity poznámkového bloku.

    Zelená šipka je zvýrazněná.

    Šipka aktivity Úspěch dává kanálu pokyn ke spuštění aktivity poznámkového bloku po úspěšném spuštění aktivity toku dat.

  5. Vyberte aktivitu poznámkového bloku (1) a pak vyberte kartu Nastavení (2), rozbalte základní parametry (3) a pak vyberte + Nový (4). Zadejte runId do pole Název (5). Vyberte řetězec pro typ (6). Jako hodnotu vyberte Přidat dynamický obsah (7).

    Zobrazí se nastavení.

  6. V části Systémové proměnné (1) vyberte ID spuštění kanálu. Tím se přidá @pipeline().RunId do pole dynamického obsahu (2). Dialogové okno zavřete výběrem možnosti Dokončit (3 ).

    Zobrazí se formulář dynamického obsahu.

    Hodnota ID spuštění kanálu je jedinečný identifikátor GUID přiřazený ke každému spuštění kanálu. Tuto hodnotu použijeme pro název souboru Parquet předáním této hodnoty jako runId parametr Poznámkový blok. Pak se můžeme podívat do historie spuštění kanálu a vyhledat konkrétní soubor Parquet vytvořený pro každé spuštění kanálu.

  7. Výběrem možnosti Publikovat vše a publikovat uložte provedené změny.

    Možnost Publikovat vše je zvýrazněná.

  8. Po dokončení publikování vyberte Přidat trigger (1) a pak trigger (2) a spusťte aktualizovaný kanál.

    Položka nabídky triggeru je zvýrazněná.

  9. Spuštěním triggeru vyberte OK .

    Tlačítko OK je zvýrazněné.

Monitorování spuštění kanálu

Centrum monitorování umožňuje monitorovat aktuální a historické aktivity pro SQL, Apache Spark a Pipelines.

  1. Přejděte do centra monitorování .

    Je vybrána položka nabídky Centrum monitorování.

  2. Vyberte spuštění kanálu (1) a počkejte na úspěšné dokončení spuštění kanálu (2). Možná budete muset zobrazení aktualizovat (3 ).

    Spuštění kanálu bylo úspěšné.

  3. Výběrem názvu kanálu zobrazíte spuštění aktivit kanálu.

    Je vybrán název kanálu.

  4. Všimněte si aktivity toku dat i nové aktivity poznámkového bloku (1). Poznamenejte si hodnotu ID spuštění kanálu (2). Porovnáme to s názvem souboru Parquet vygenerovaným poznámkový blok. Výběrem názvu poznámkového bloku Pro výpočet prvních 5 produktů zobrazíte podrobnosti (3).

    Zobrazí se podrobnosti o spuštění kanálu.

  5. Tady vidíme podrobnosti o spuštění poznámkového bloku. Výběrem možnosti Přehrávání (1) můžete sledovat přehrávání průběhu úloh (2). V dolní části můžete zobrazit diagnostiku a protokoly s různými možnostmi filtru (3). Napravo můžeme zobrazit podrobnosti o spuštění, jako je doba trvání, ID Livy, podrobnosti o fondu Sparku atd. Výběrem odkazu Zobrazit podrobnosti úlohy zobrazíte jeho podrobnosti (5).

    Zobrazí se podrobnosti o spuštění.

  6. Uživatelské rozhraní aplikace Spark se otevře na nové kartě, kde uvidíme podrobnosti fáze. Rozbalte vizualizaci DAG a zobrazte podrobnosti fáze.

    Zobrazí se podrobnosti fáze Sparku.

  7. Vraťte se do datového centra.

    Datové centrum.

  8. Vyberte kartu Propojená (1) a pak vyberte kontejner wwi-02 (2) v primárním účtu úložiště Data Lake Storage, přejděte do složky top5-products (3) a ověřte, že složka existuje pro soubor Parquet, jehož název odpovídá ID spuštění kanálu.

    Soubor je zvýrazněný.

    Jak vidíte, máme soubor, jehož název odpovídá ID spuštění kanálu, které jsme si předtím poznamenali:

    Id spuštění kanálu je zvýrazněné.

    Tyto hodnoty se shodují, protože jsme předali ID runId spuštění kanálu parametru aktivity poznámkového bloku.