Gyakorlat – Jegyzetfüzet integrálása az Azure Synapse Pipelinesban
Ebben a leckében egy Azure Synapse Spark-jegyzetfüzetet hoz létre a leképezési adatfolyam által betöltött adatok elemzéséhez és átalakításához, valamint az adatok data lake-ben való tárolásához. Létrehoz egy paramétercellát, amely elfogad egy sztringparamétert, amely meghatározza a jegyzetfüzet által a data lake-be írt adatok mappanevét.
Ezután hozzáadja ezt a jegyzetfüzetet egy Synapse-folyamathoz, és átadja az egyedi folyamatfuttatási azonosítót a jegyzetfüzet paraméterének, hogy később korrelálni tudja a folyamatot a jegyzetfüzettevékenység által mentett adatokkal.
Végül a Synapse Studióban a Monitor hub használatával figyelheti a folyamatfuttatást, lekérte a futtatási azonosítót, majd megkeresheti a data lake-ben tárolt megfelelő fájlokat.
Az Apache Spark és a jegyzetfüzetek bemutatása
Az Apache Spark egy párhuzamos feldolgozási keretrendszer, amely támogatja a memórián belüli feldolgozást a big data elemzési alkalmazások teljesítményének növelése érdekében. Az Azure Synapse Analyticsben üzemelő Apache Spark az Apache Spark egyik felhőbeli megvalósítása a Microsofttól.
Az Apache Spark-jegyzetfüzetek a Synapse Studióban egy webes felület, amellyel élő kódot, vizualizációkat és kísérőszövegeket tartalmazó fájlokat hozhat létre. A notebookok kitűnően alkalmasak az ötletek kipróbálására és gyors kísérletek elvégzésére, amelyekkel megállapításokat tehet az adatokról. A jegyzetfüzeteket széles körben használják az adatok előkészítésében, az adatvizualizációban, a gépi tanulásban és más Big Data-forgatókönyvekben is.
Synapse Spark-jegyzetfüzet létrehozása
Tegyük fel, hogy létrehozott egy leképezési adatfolyamot a Synapse Analyticsben a felhasználói profiladatok feldolgozásához, csatlakoztatásához és importálásához. Most meg szeretné találni az öt legjobb terméket minden felhasználó számára, attól függően, hogy melyikeket részesíti előnyben, és melyik a legjobb választás, és az elmúlt 12 hónapban a legtöbb vásárlást kapta. Ezután ki szeretné számítani az öt legjobb terméket.
Ebben a gyakorlatban létrehoz egy Synapse Spark-jegyzetfüzetet a számítások elvégzéséhez.
Nyissa meg a Synapse Analytics Studiót (https://web.azuresynapse.net/), és nyissa meg a Data Hubot.
Válassza a Csatolt lapot (1), és bontsa ki az elsődleges Data Lake Storage-fiókot (2) az Azure Data Lake Storage Gen2 alatt. Válassza ki a wwi-02 tárolót (3),és nyissa meg a top-products mappát (4).> Kattintson a jobb gombbal bármelyik Parquet-fájlra (5),válassza az Új jegyzetfüzet menüelemet (6),majd válassza a Betöltés a DataFrame-be (7) lehetőséget. Ha nem látja a mappát, válassza a lehetőséget
Refresh
.Győződjön meg arról, hogy a jegyzetfüzet csatlakoztatva van a Spark-készlethez.
Cserélje le a Parquet-fájl nevét
*.parquet
(1) atop-products
mappában lévő összes Parquet-fájl kijelöléséhez. Az elérési útnak például a következőhöz hasonlónak kell lennie:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.A jegyzetfüzet végrehajtásához válassza az Összes futtatása a jegyzetfüzet eszköztárán lehetőséget.
Feljegyzés
Amikor először futtat jegyzetfüzetet egy Spark-készletben, a Synapse létrehoz egy új munkamenetet. Ez körülbelül 3-5 percet vehet igénybe.
Feljegyzés
Ha csak a cellát szeretné futtatni, mutasson a cellára, és válassza a Cella futtatása ikont a cella bal oldalán, vagy jelölje ki a cellát, majd írja be a Ctrl+Enter billentyűkombinációt.
Hozzon létre egy új cellát alatta a + gomb kiválasztásával, majd a Kód cellaelem kiválasztásával. A + gomb a bal oldali jegyzetfüzetcella alatt található. Másik lehetőségként a Jegyzetfüzet eszköztár + Cella menüjét is kibonthatja, és kiválaszthatja a Kód cellaelemet.
Futtassa a következő parancsot az új cellában egy új, úgynevezett
topPurchases
adatkeret feltöltéséhez, hozzon létre egy új ideiglenes nézetet, éstop_purchases
jelenítse meg az első 100 sort:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
A kimenetnek a következőképpen kell kinéznie:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Futtassa a következő parancsot egy új cellában egy új ideiglenes nézet sql használatával történő létrehozásához:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Feljegyzés
Ehhez a lekérdezéshez nincs kimenet.
A lekérdezés az
top_purchases
ideiglenes nézetet használja forrásként, és egyrow_number() over
metódust alkalmaz egy sorszám alkalmazásához minden olyan felhasználó rekordjaihoz, aholItemsPurchasedLast12Months
a legnagyobb. Awhere
záradék szűri az eredményeket, így legfeljebb öt terméket kérünk le, ahol mindkettőIsTopProduct
IsPreferredProduct
igaz. Ez minden felhasználó számára az első öt leggyakrabban vásárolt terméket adja meg, ahol a rendszer az Azure Cosmos DB-ben tárolt felhasználói profiljuknak megfelelően a kedvenc termékeként is azonosítja őket.Futtassa a következő parancsot egy új cellában egy új DataFrame létrehozásához és megjelenítéséhez, amely az előző cellában létrehozott ideiglenes nézet eredményeit
top_5_products
tárolja:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
A következőhöz hasonló kimenetnek kell megjelennie, amely felhasználónként az öt legfontosabb terméket jeleníti meg:
Az ügyfelek által előnyben részesített és a legtöbbet vásárolt termékek alapján számítsa ki az első öt terméket. Ehhez futtassa a következő parancsot egy új cellában:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
Ebben a cellában az első öt előnyben részesített terméket termékazonosító szerint csoportosítottuk, összesítettük az elmúlt 12 hónapban vásárolt összes tételt, csökkenő sorrendbe rendeztük ezt az értéket, és visszaadtuk az első öt találatot. A kimenet az alábbihoz hasonló lesz:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Paramétercella létrehozása
Az Azure Synapse-folyamatok megkeresik a paraméterek celláját, és ezt a cellát alapértelmezettként kezelik a végrehajtási időpontban átadott paraméterekhez. A végrehajtási motor egy új cellát ad hozzá a paramétercella alá bemeneti paraméterekkel az alapértelmezett értékek felülírásához. Ha nincs kijelölve paramétercella, a program a beszúrt cellát a jegyzetfüzet tetején helyezi el.
Ezt a jegyzetfüzetet egy folyamatból fogjuk végrehajtani. Olyan paramétert szeretnénk átadni, amely beállít egy
runId
változóértéket, amely a Parquet-fájl elnevezésére szolgál. Futtassa a következő parancsot egy új cellában:import uuid # Generate random GUID runId = uuid.uuid4()
A Sparkhoz kapcsolódó kódtárat használjuk
uuid
egy véletlenszerű GUID létrehozásához. Felül szeretnénk bírálni arunId
változót a folyamat által átadott paraméterrel. Ehhez ezt paramétercelláként kell kapcsolnunk.Jelölje ki a műveletek három pontját (...) a cella (1) jobb felső sarkában, majd válassza a Paramétercella váltása (2) lehetőséget.
A beállítás összesítése után megjelenik a Paraméterek címke a cellán.
Illessze be a következő kódot egy új cellába, hogy parquet-fájlnévként használja a
runId
változót az/top5-products/
elsődleges Data Lake-fiók elérési útjában. Cserélje leYOUR_DATALAKE_NAME
az elérési utat az elsődleges data lake-fiók nevére. Ennek megkereséséhez görgessen fel az 1 . cellához az oldal tetején (1). Másolja ki a Data Lake Storage-fiókot a (2) elérési útból. Illessze be ezt az értéket az új cellába az elérési út (3) helyéreYOUR_DATALAKE_NAME
, majd futtassa a parancsot a cellában.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Ellenőrizze, hogy a fájl a data lake-be lett-e írva. Nyissa meg az Adatközpontot , és válassza a Csatolt lap (1) lehetőséget. Bontsa ki az elsődleges Data Lake Storage-fiókot, majd válassza ki a wwi-02 tárolót (2). Nyissa meg a top5-products mappát (3). A könyvtárban meg kell jelennie a Parquet-fájl mappájának, amelynek fájlneve GUID (4).
A Jegyzetfüzet cellában lévő adatkeret parquet írási metódusa azért hozta létre ezt a könyvtárat, mert korábban nem létezett.
Jegyzetfüzet hozzáadása Synapse-folyamathoz
A gyakorlat elején ismertetett leképezési Adatfolyam hivatkozva tegyük fel, hogy a jegyzetfüzetet a vezénylési folyamat részeként futtatott Adatfolyam után szeretné végrehajtani. Ehhez új jegyzetfüzet-tevékenységként adja hozzá ezt a jegyzetfüzetet egy folyamathoz.
Térjen vissza a jegyzetfüzethez. A jegyzetfüzet jobb felső sarkában válassza a Tulajdonságok (1) lehetőséget, majd adja meg
Calculate Top 5 Products
a Név (2) értéket.A jegyzetfüzet jobb felső sarkában válassza a Hozzáadás a folyamathoz (1), majd a Meglévő folyamat (2) lehetőséget.
Válassza ki a Felhasználói profil adatainak írása ASA-folyamatba (1), majd a *(2) hozzáadása lehetőséget.
A Synapse Studio hozzáadja a jegyzetfüzet-tevékenységet a folyamathoz. Átrendezheti a jegyzetfüzet-tevékenységet , hogy az az adatfolyam-tevékenység jobb oldalán legyen. Jelölje ki az adatfolyam-tevékenységet, és húzza a Sikeres tevékenység folyamat kapcsolat zöld mezőjét a Jegyzetfüzet tevékenységhez.
A Siker tevékenység nyíl arra utasítja a folyamatot, hogy futtassa a jegyzetfüzet-tevékenységet az adatfolyam-tevékenység sikeres futtatása után.
Válassza ki a Jegyzetfüzet tevékenységet (1),majd a Beállítások lapot (2), bontsa ki az Alapparaméterek (3), majd az + Új (4) lehetőséget. Írja be
runId
a Név mezőbe (5). Válassza ki a Típus (6) sztringet. Az Érték mezőben válassza a Dinamikus tartalom hozzáadása (7) lehetőséget.Válassza ki a folyamatfuttatás azonosítóját a rendszerváltozók (1) alatt. Ez hozzáadja
@pipeline().RunId
a dinamikus tartalommezőhöz (2). A párbeszédpanel bezárásához válassza a Befejezés (3) lehetőséget.A folyamatfuttatás azonosítója minden egyes folyamatfuttatáshoz egyedi GUID-azonosító. Ezt az értéket a Parquet-fájl nevére fogjuk használni, ha ezt az
runId
értéket notebook paraméterként adjuk át. Ezután áttekinthetjük a folyamat futtatási előzményeit, és megkereshetjük az egyes folyamatok futtatásához létrehozott konkrét Parquet-fájlt.A módosítások mentéséhez válassza az Összes közzététele, majd a Közzététel lehetőséget.
A közzététel befejezése után válassza az Eseményindító hozzáadása (1), majd az Eseményindító (2) lehetőséget a frissített folyamat futtatásához.
Az eseményindító futtatásához válassza az OK gombot .
A folyamat futásának monitorozása
A Monitor-központ lehetővé teszi az SQL, az Apache Spark és a Pipelines aktuális és korábbi tevékenységeinek monitorozását.
Lépjen a Monitor hubra.
Válassza ki a Folyamatfuttatások (1) lehetőséget, és várja meg, amíg a folyamat futtatása sikeresen befejeződik (2). Előfordulhat, hogy frissítenie kell (3) a nézetet.
Válassza ki a folyamat nevét a folyamat tevékenységfuttatásainak megtekintéséhez.
Figyelje meg az adatfolyam-tevékenységet és az új jegyzetfüzettevékenységet (1). Jegyezze fel a folyamat futásazonosítójának értékét (2).. Ezt összehasonlítjuk a jegyzetfüzet által létrehozott Parquet-fájl nevével. Válassza az 5. termékjegyzetfüzet nevének kiszámítása lehetőséget a részletek megtekintéséhez (3).< />
Itt a jegyzetfüzet futtatási adatait láthatjuk. A Lejátszás (1) lehetőséget választva megtekintheti az előrehaladás lejátszását a feladatokon (2). Alul megtekintheti a Diagnosztikát és a Naplókat különböző szűrési lehetőségekkel (3).. A jobb oldalon megtekintheti a futtatás részleteit, például az időtartamot, a Livy-azonosítót, a Spark-készlet részleteit stb. A feladat részleteinek megtekintéséhez kattintson a Részletek megtekintése hivatkozásra (5).
A Spark-alkalmazás felhasználói felülete megnyílik egy új lapon, ahol megtekintheti a szakasz részleteit. Bontsa ki a DAG-vizualizációt a szakasz részleteinek megtekintéséhez.
Lépjen vissza a Data Hubra.
Válassza a Csatolt lapot (1),majd válassza ki a wwi-02 tárolót (2) az elsődleges Data Lake Storage-fiókban, lépjen a top5-products mappára (3),és ellenőrizze, hogy létezik-e mappa a Parquet-fájlhoz, amelynek a neve megegyezik a folyamatfuttatás azonosítójával.
Mint látható, van egy fájlunk, amelynek a neve megegyezik a korábban feljegyzett folyamatfuttatási azonosítóval :
Ezek az értékek azért egyeznek, mert a folyamatfuttatási azonosítóban átadtuk a
runId
jegyzetfüzet-tevékenység paraméterének.