Gyakorlat – Jegyzetfüzet integrálása az Azure Synapse Pipelinesban

Befejeződött

Ebben a leckében egy Azure Synapse Spark-jegyzetfüzetet hoz létre a leképezési adatfolyam által betöltött adatok elemzéséhez és átalakításához, valamint az adatok data lake-ben való tárolásához. Létrehoz egy paramétercellát, amely elfogad egy sztringparamétert, amely meghatározza a jegyzetfüzet által a data lake-be írt adatok mappanevét.

Ezután hozzáadja ezt a jegyzetfüzetet egy Synapse-folyamathoz, és átadja az egyedi folyamatfuttatási azonosítót a jegyzetfüzet paraméterének, hogy később korrelálni tudja a folyamatot a jegyzetfüzettevékenység által mentett adatokkal.

Végül a Synapse Studióban a Monitor hub használatával figyelheti a folyamatfuttatást, lekérte a futtatási azonosítót, majd megkeresheti a data lake-ben tárolt megfelelő fájlokat.

Az Apache Spark és a jegyzetfüzetek bemutatása

Az Apache Spark egy párhuzamos feldolgozási keretrendszer, amely támogatja a memórián belüli feldolgozást a big data elemzési alkalmazások teljesítményének növelése érdekében. Az Azure Synapse Analyticsben üzemelő Apache Spark az Apache Spark egyik felhőbeli megvalósítása a Microsofttól.

Az Apache Spark-jegyzetfüzetek a Synapse Studióban egy webes felület, amellyel élő kódot, vizualizációkat és kísérőszövegeket tartalmazó fájlokat hozhat létre. A notebookok kitűnően alkalmasak az ötletek kipróbálására és gyors kísérletek elvégzésére, amelyekkel megállapításokat tehet az adatokról. A jegyzetfüzeteket széles körben használják az adatok előkészítésében, az adatvizualizációban, a gépi tanulásban és más Big Data-forgatókönyvekben is.

Synapse Spark-jegyzetfüzet létrehozása

Tegyük fel, hogy létrehozott egy leképezési adatfolyamot a Synapse Analyticsben a felhasználói profiladatok feldolgozásához, csatlakoztatásához és importálásához. Most meg szeretné találni az öt legjobb terméket minden felhasználó számára, attól függően, hogy melyikeket részesíti előnyben, és melyik a legjobb választás, és az elmúlt 12 hónapban a legtöbb vásárlást kapta. Ezután ki szeretné számítani az öt legjobb terméket.

Ebben a gyakorlatban létrehoz egy Synapse Spark-jegyzetfüzetet a számítások elvégzéséhez.

  1. Nyissa meg a Synapse Analytics Studiót (https://web.azuresynapse.net/), és nyissa meg a Data Hubot.

    Az Adatok menüelem ki van emelve.

  2. Válassza a Csatolt lapot (1), és bontsa ki az elsődleges Data Lake Storage-fiókot (2) az Azure Data Lake Storage Gen2 alatt. Válassza ki a wwi-02 tárolót (3),és nyissa meg a top-products mappát (4).> Kattintson a jobb gombbal bármelyik Parquet-fájlra (5),válassza az Új jegyzetfüzet menüelemet (6),majd válassza a Betöltés a DataFrame-be (7) lehetőséget. Ha nem látja a mappát, válassza a lehetőséget Refresh.

    A Parquet-fájl és az új jegyzetfüzet lehetőség ki van emelve.

  3. Győződjön meg arról, hogy a jegyzetfüzet csatlakoztatva van a Spark-készlethez.

    A Spark-készlethez csatolás menüelem ki van emelve.

  4. Cserélje le a Parquet-fájl nevét *.parquet (1) a top-products mappában lévő összes Parquet-fájl kijelöléséhez. Az elérési útnak például a következőhöz hasonlónak kell lennie: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    A fájlnév ki van emelve.

  5. A jegyzetfüzet végrehajtásához válassza az Összes futtatása a jegyzetfüzet eszköztárán lehetőséget.

    A cellaeredmények megjelennek.

    Feljegyzés

    Amikor először futtat jegyzetfüzetet egy Spark-készletben, a Synapse létrehoz egy új munkamenetet. Ez körülbelül 3-5 percet vehet igénybe.

    Feljegyzés

    Ha csak a cellát szeretné futtatni, mutasson a cellára, és válassza a Cella futtatása ikont a cella bal oldalán, vagy jelölje ki a cellát, majd írja be a Ctrl+Enter billentyűkombinációt.

  6. Hozzon létre egy új cellát alatta a + gomb kiválasztásával, majd a Kód cellaelem kiválasztásával. A + gomb a bal oldali jegyzetfüzetcella alatt található. Másik lehetőségként a Jegyzetfüzet eszköztár + Cella menüjét is kibonthatja, és kiválaszthatja a Kód cellaelemet.

    A Kód hozzáadása menü ki van emelve.

  7. Futtassa a következő parancsot az új cellában egy új, úgynevezett topPurchasesadatkeret feltöltéséhez, hozzon létre egy új ideiglenes nézetet, és top_purchasesjelenítse meg az első 100 sort:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    A kimenetnek a következőképpen kell kinéznie:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Futtassa a következő parancsot egy új cellában egy új ideiglenes nézet sql használatával történő létrehozásához:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Feljegyzés

    Ehhez a lekérdezéshez nincs kimenet.

    A lekérdezés az top_purchases ideiglenes nézetet használja forrásként, és egy row_number() over metódust alkalmaz egy sorszám alkalmazásához minden olyan felhasználó rekordjaihoz, ahol ItemsPurchasedLast12Months a legnagyobb. A where záradék szűri az eredményeket, így legfeljebb öt terméket kérünk le, ahol mindkettő IsTopProduct IsPreferredProduct igaz. Ez minden felhasználó számára az első öt leggyakrabban vásárolt terméket adja meg, ahol a rendszer az Azure Cosmos DB-ben tárolt felhasználói profiljuknak megfelelően a kedvenc termékeként is azonosítja őket.

  9. Futtassa a következő parancsot egy új cellában egy új DataFrame létrehozásához és megjelenítéséhez, amely az előző cellában létrehozott ideiglenes nézet eredményeit top_5_products tárolja:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    A következőhöz hasonló kimenetnek kell megjelennie, amely felhasználónként az öt legfontosabb terméket jeleníti meg:

    Az első öt előnyben részesített termék felhasználónként jelenik meg.

  10. Az ügyfelek által előnyben részesített és a legtöbbet vásárolt termékek alapján számítsa ki az első öt terméket. Ehhez futtassa a következő parancsot egy új cellában:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    Ebben a cellában az első öt előnyben részesített terméket termékazonosító szerint csoportosítottuk, összesítettük az elmúlt 12 hónapban vásárolt összes tételt, csökkenő sorrendbe rendeztük ezt az értéket, és visszaadtuk az első öt találatot. A kimenet az alábbihoz hasonló lesz:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Paramétercella létrehozása

Az Azure Synapse-folyamatok megkeresik a paraméterek celláját, és ezt a cellát alapértelmezettként kezelik a végrehajtási időpontban átadott paraméterekhez. A végrehajtási motor egy új cellát ad hozzá a paramétercella alá bemeneti paraméterekkel az alapértelmezett értékek felülírásához. Ha nincs kijelölve paramétercella, a program a beszúrt cellát a jegyzetfüzet tetején helyezi el.

  1. Ezt a jegyzetfüzetet egy folyamatból fogjuk végrehajtani. Olyan paramétert szeretnénk átadni, amely beállít egy runId változóértéket, amely a Parquet-fájl elnevezésére szolgál. Futtassa a következő parancsot egy új cellában:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    A Sparkhoz kapcsolódó kódtárat használjuk uuid egy véletlenszerű GUID létrehozásához. Felül szeretnénk bírálni a runId változót a folyamat által átadott paraméterrel. Ehhez ezt paramétercelláként kell kapcsolnunk.

  2. Jelölje ki a műveletek három pontját (...) a cella (1) jobb felső sarkában, majd válassza a Paramétercella váltása (2) lehetőséget.

    A menüelem ki van emelve.

    A beállítás összesítése után megjelenik a Paraméterek címke a cellán.

    A cella úgy van konfigurálva, hogy paramétereket fogadjon el.

  3. Illessze be a következő kódot egy új cellába, hogy parquet-fájlnévként használja a runId változót az /top5-products/ elsődleges Data Lake-fiók elérési útjában. Cserélje le YOUR_DATALAKE_NAME az elérési utat az elsődleges data lake-fiók nevére. Ennek megkereséséhez görgessen fel az 1 . cellához az oldal tetején (1). Másolja ki a Data Lake Storage-fiókot a (2) elérési útból. Illessze be ezt az értéket az új cellába az elérési út (3) helyéreYOUR_DATALAKE_NAME, majd futtassa a parancsot a cellában.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Az elérési út az elsődleges data lake-fiók nevével frissül.

  4. Ellenőrizze, hogy a fájl a data lake-be lett-e írva. Nyissa meg az Adatközpontot , és válassza a Csatolt lap (1) lehetőséget. Bontsa ki az elsődleges Data Lake Storage-fiókot, majd válassza ki a wwi-02 tárolót (2). Nyissa meg a top5-products mappát (3). A könyvtárban meg kell jelennie a Parquet-fájl mappájának, amelynek fájlneve GUID (4).

    A parquet fájl ki van emelve.

    A Jegyzetfüzet cellában lévő adatkeret parquet írási metódusa azért hozta létre ezt a könyvtárat, mert korábban nem létezett.

Jegyzetfüzet hozzáadása Synapse-folyamathoz

A gyakorlat elején ismertetett leképezési Adatfolyam hivatkozva tegyük fel, hogy a jegyzetfüzetet a vezénylési folyamat részeként futtatott Adatfolyam után szeretné végrehajtani. Ehhez új jegyzetfüzet-tevékenységként adja hozzá ezt a jegyzetfüzetet egy folyamathoz.

  1. Térjen vissza a jegyzetfüzethez. A jegyzetfüzet jobb felső sarkában válassza a Tulajdonságok (1) lehetőséget, majd adja meg Calculate Top 5 Products a Név (2) értéket.

    Megjelenik a Tulajdonságok panel.

  2. A jegyzetfüzet jobb felső sarkában válassza a Hozzáadás a folyamathoz (1), majd a Meglévő folyamat (2) lehetőséget.

    A folyamathoz való hozzáadás gomb ki van emelve.

  3. Válassza ki a Felhasználói profil adatainak írása ASA-folyamatba (1), majd a *(2) hozzáadása lehetőséget.

    A folyamat ki van jelölve.

  4. A Synapse Studio hozzáadja a jegyzetfüzet-tevékenységet a folyamathoz. Átrendezheti a jegyzetfüzet-tevékenységet , hogy az az adatfolyam-tevékenység jobb oldalán legyen. Jelölje ki az adatfolyam-tevékenységet, és húzza a Sikeres tevékenység folyamat kapcsolat zöld mezőjét a Jegyzetfüzet tevékenységhez.

    A zöld nyíl ki van emelve.

    A Siker tevékenység nyíl arra utasítja a folyamatot, hogy futtassa a jegyzetfüzet-tevékenységet az adatfolyam-tevékenység sikeres futtatása után.

  5. Válassza ki a Jegyzetfüzet tevékenységet (1),majd a Beállítások lapot (2), bontsa ki az Alapparaméterek (3), majd az + Új (4) lehetőséget. Írja be runId a Név mezőbe (5). Válassza ki a Típus (6) sztringet. Az Érték mezőben válassza a Dinamikus tartalom hozzáadása (7) lehetőséget.

    A beállítások megjelennek.

  6. Válassza ki a folyamatfuttatás azonosítóját a rendszerváltozók (1) alatt. Ez hozzáadja @pipeline().RunId a dinamikus tartalommezőhöz (2). A párbeszédpanel bezárásához válassza a Befejezés (3) lehetőséget.

    Megjelenik a dinamikus tartaloműrlap.

    A folyamatfuttatás azonosítója minden egyes folyamatfuttatáshoz egyedi GUID-azonosító. Ezt az értéket a Parquet-fájl nevére fogjuk használni, ha ezt az runId értéket notebook paraméterként adjuk át. Ezután áttekinthetjük a folyamat futtatási előzményeit, és megkereshetjük az egyes folyamatok futtatásához létrehozott konkrét Parquet-fájlt.

  7. A módosítások mentéséhez válassza az Összes közzététele, majd a Közzététel lehetőséget.

    Az összes közzététele ki van emelve.

  8. A közzététel befejezése után válassza az Eseményindító hozzáadása (1), majd az Eseményindító (2) lehetőséget a frissített folyamat futtatásához.

    Az eseményindító menüelem ki van emelve.

  9. Az eseményindító futtatásához válassza az OK gombot .

    Az OK gomb ki van emelve.

A folyamat futásának monitorozása

A Monitor-központ lehetővé teszi az SQL, az Apache Spark és a Pipelines aktuális és korábbi tevékenységeinek monitorozását.

  1. Lépjen a Monitor hubra.

    A Monitor hub menüelem van kiválasztva.

  2. Válassza ki a Folyamatfuttatások (1) lehetőséget, és várja meg, amíg a folyamat futtatása sikeresen befejeződik (2). Előfordulhat, hogy frissítenie kell (3) a nézetet.

    A folyamat futtatása sikeres volt.

  3. Válassza ki a folyamat nevét a folyamat tevékenységfuttatásainak megtekintéséhez.

    A folyamat neve ki van jelölve.

  4. Figyelje meg az adatfolyam-tevékenységet és az új jegyzetfüzettevékenységet (1). Jegyezze fel a folyamat futásazonosítójának értékét (2).. Ezt összehasonlítjuk a jegyzetfüzet által létrehozott Parquet-fájl nevével. Válassza az 5. termékjegyzetfüzet nevének kiszámítása lehetőséget a részletek megtekintéséhez (3).< />

    Megjelennek a folyamatfuttatás részletei.

  5. Itt a jegyzetfüzet futtatási adatait láthatjuk. A Lejátszás (1) lehetőséget választva megtekintheti az előrehaladás lejátszását a feladatokon (2). Alul megtekintheti a Diagnosztikát és a Naplókat különböző szűrési lehetőségekkel (3).. A jobb oldalon megtekintheti a futtatás részleteit, például az időtartamot, a Livy-azonosítót, a Spark-készlet részleteit stb. A feladat részleteinek megtekintéséhez kattintson a Részletek megtekintése hivatkozásra (5).

    Megjelennek a futtatás részletei.

  6. A Spark-alkalmazás felhasználói felülete megnyílik egy új lapon, ahol megtekintheti a szakasz részleteit. Bontsa ki a DAG-vizualizációt a szakasz részleteinek megtekintéséhez.

    Ekkor megjelennek a Spark-fázis részletei.

  7. Lépjen vissza a Data Hubra.

    Adatközpont.

  8. Válassza a Csatolt lapot (1),majd válassza ki a wwi-02 tárolót (2) az elsődleges Data Lake Storage-fiókban, lépjen a top5-products mappára (3),és ellenőrizze, hogy létezik-e mappa a Parquet-fájlhoz, amelynek a neve megegyezik a folyamatfuttatás azonosítójával.

    A fájl ki van emelve.

    Mint látható, van egy fájlunk, amelynek a neve megegyezik a korábban feljegyzett folyamatfuttatási azonosítóval :

    A folyamatfuttatás azonosítója ki van emelve.

    Ezek az értékek azért egyeznek, mert a folyamatfuttatási azonosítóban átadtuk a runId jegyzetfüzet-tevékenység paraméterének.