Megosztás a következőn keresztül:


Apache Spark-feladatok optimalizálása az Azure Synapse Analyticsben

Megtudhatja, hogyan optimalizálhat egy Apache Spark-fürtkonfigurációt az adott számítási feladathoz. A leggyakoribb kihívás a memória leterheltsége, amelynek okai lehetnek a nem megfelelő konfigurációk (különösen a nem megfelelő méretű végrehajtók), a hosszan futó műveletek, valamint a Descartes-műveleteket eredményező feladatok. Felgyorsíthatja a feladatokat a megfelelő gyorsítótárazással, valamint az adateltérés engedélyezésével. A legjobb teljesítmény érdekében monitorozza és tekintse át a hosszú ideig futó és erőforrás-használatú Spark-feladatok végrehajtását.

A következő szakaszok a Spark-feladatok gyakori optimalizálását és javaslatait ismertetik.

Az adat absztrakciójának kiválasztása

A korábbi Spark-verziók RDD-kkel absztrakciós adatokat, a Spark 1.3-at és az 1.6-os bevezetett adatkereteket és adatkészleteket használnak. Vegye figyelembe a következő relatív érdemeket:

  • Adatkeretek
    • A legjobb választás a legtöbb helyzetben.
    • Lekérdezésoptimalizálást biztosít a Catalysten keresztül.
    • Teljes körű kódlétrehozás.
    • Közvetlen memóriahozzáférés.
    • Alacsony szemétgyűjtési (GC) többletterhelés.
    • Nem olyan fejlesztőbarát, mint az Adathalmazok, mivel nincs fordítási idő ellenőrzése vagy tartományobjektum-programozás.
  • Adatkészletek
    • Olyan összetett ETL-folyamatokban jó, ahol a teljesítményre gyakorolt hatás elfogadható.
    • Nem jó az olyan összesítésekben, ahol a teljesítményre gyakorolt hatás jelentős lehet.
    • Lekérdezésoptimalizálást biztosít a Catalysten keresztül.
    • Fejlesztőbarát tartományi objektumok programozásával és fordítási idő ellenőrzésével.
    • Szerializálási/deszerializálási többletterhelést ad hozzá.
    • Magas GC-többletterhelés.
    • Megszakítja a teljes fázisú kódgenerálást.
  • RDD-k
    • Nem kell RDD-ket használnia, hacsak nem kell új egyéni RDD-t létrehoznia.
    • Nincs lekérdezésoptimalizálás a Katalizátoron keresztül.
    • Nincs teljes fázisú kódgenerálás.
    • Magas GC-többletterhelés.
    • Spark 1.x örökölt API-kat kell használnia.

Optimális adatformátum használata

A Spark számos formátumot támogat, például csv, json, xml, parquet, orc és avro. A Spark kiterjeszthető további formátumok külső adatforrásokkal való támogatásához – további információt az Apache Spark-csomagokban talál.

A teljesítmény legjobb formátuma a parquet snappy tömörítéssel, amely a Spark 2.x alapértelmezett formátuma. A parquet oszlopos formátumban tárolja az adatokat, és a Sparkban rendkívül optimalizált. Emellett a snappy tömörítés nagyobb fájlokat eredményezhet, mint mondjuk gzip tömörítést. A fájlok felosztható jellege miatt gyorsabban bontják le a fájlokat.

Gyorsítótár használata

A Spark saját natív gyorsítótárazási mechanizmusokat biztosít, amelyek különböző módszerekkel, például .persist(), .cache()és CACHE TABLE. Ez a natív gyorsítótárazás kis adathalmazok és ETL-folyamatok esetén is hatékony, ahol köztes eredményeket kell gyorsítótáraznia. A Spark natív gyorsítótárazása azonban jelenleg nem működik jól a particionálással, mivel a gyorsítótárazott táblák nem őrzik meg a particionálási adatokat.

A memória hatékony használata

A Spark az adatok memóriába helyezésével működik, így a memóriaerőforrások kezelése kulcsfontosságú szempont a Spark-feladatok végrehajtásának optimalizálásában. A fürt memóriájának hatékony használatára számos módszer alkalmazható.

  • A particionálási stratégiában előnyben részesítheti a kisebb adatpartíciókat és -fiókokat az adatmérethez, -típusokhoz és -elosztáshoz.

  • A Synapse Sparkban (Runtime 3.1 vagy újabb verzió) a Kryo-adatok szerializálása alapértelmezés szerint engedélyezve van a Kryo-adatok szerializálásával.

  • A kryoserializer pufferméretét testre szabhatja a Spark-konfigurációval a számítási feladatokra vonatkozó követelmények alapján:

    // Set the desired property
    spark.conf.set("spark.kryoserializer.buffer.max", "256m")
    
    
  • A Spark konfigurációs beállításainak monitorozása és finomhangolása.

Referenciaként a Spark memóriastruktúrája és néhány kulcs végrehajtó memóriaparamétere megjelenik a következő képen.

A Spark memóriahasználati szempontjai

Az Azure Synapse-ben az Apache Spark YARN Apache Hadoop YARN-t használ, a YARN az egyes Spark-csomópontok összes tárolója által használt memória maximális összegét szabályozza. Az alábbi ábra a fő objektumokat és azok kapcsolatait mutatja be.

YARN Spark Memóriakezelés

A "memóriakihasznált" üzenetek kezeléséhez próbálja meg a következőt:

  • Tekintse át a DAG felügyeleti sorrendet. Csökkentés térképoldali csökkentéssel, előparticionálással (vagy gyűjtőbe rendezés) a forrásadatokkal, az egyetlen shuffles maximalizálása és az elküldött adatok mennyiségének csökkentése.
  • Előnyben részesíti ReduceByKey a rögzített memóriakorlátot GroupByKey, amely aggregációkat, ablakokat és egyéb függvényeket biztosít, de határtalan memóriakorláttal rendelkezik.
  • Előnyben részesítse TreeReduce, amely több munkát végez a végrehajtókon vagy partíciókon, amellyel Reduceminden működik az illesztőprogramon.
  • Használja a DataFrame-eket az alacsonyabb szintű RDD-objektumok helyett.
  • Olyan ComplexType-okat hozhat létre, amelyek beágyazzák a műveleteket, például "Top N", különböző összesítések vagy ablakozási műveletek.

Az adatszerializálás optimalizálása

A Spark-feladatok elosztottak, így a legjobb teljesítmény érdekében fontos a megfelelő adat szerializálás. A Sparkhoz két szerializálási lehetőség áll rendelkezésre:

  • Java-szerializálás
  • A Kryo szerializálás az alapértelmezett. Ez egy újabb formátum, és gyorsabb és kompaktabb szerializálást eredményezhet, mint a Java. A Kryo megköveteli, hogy regisztrálja az osztályokat a programban, és még nem támogatja az összes szerializálható típust.

Gyűjtés használata

A gyűjtőzés hasonló az adatparticionáláshoz, de minden gyűjtő nem csak egy oszlopértéket tartalmazhat. A gyűjtők jól működnek nagy (több millió vagy több) érték, például termékazonosítók particionálására. A gyűjtők meghatározása a sor gyűjtőkulcsának kivonatolásával történik. A gyűjtőtáblák egyedi optimalizálásokat kínálnak, mivel metaadatokat tárolnak a gyűjtők és a rendezés módjáról.

Néhány speciális gyűjtőfunkció:

  • Lekérdezésoptimalizálás a metaadatok gyűjtőbe helyezése alapján.
  • Optimalizált összesítések.
  • Optimalizált illesztések.

Egyidejűleg particionálást és gyűjtőmunkát is használhat.

A csatlakozás és a véletlen sorrend optimalizálása

Ha lassú feladatokat végez egy illesztésen vagy shuffle-on, az ok valószínűleg az adateltérés, ami a feladatadatok aszimmetriája. Egy leképezési feladat például 20 másodpercet is igénybe vehet, de egy olyan feladat futtatása, amelyben az adatok össze vannak kapcsolva vagy össze vannak kapcsolva, órákat vesz igénybe. Az adateltérés javításához sózzuk a teljes kulcsot, vagy használjunk izolált sót a kulcsok csak néhány részhalmazához. Ha izolált sót használ, további szűréssel elkülönítheti a sózott kulcsok részhalmazát a térképcsatlakozásokban. Egy másik lehetőség az, hogy először egy gyűjtőoszlopot vezet be, majd előre összesíti a gyűjtőket.

A lassú illesztést okozó másik tényező lehet az illesztés típusa. Alapértelmezés szerint a Spark az SortMerge illesztés típusát használja. Ez a típusú illesztés leginkább nagy adathalmazokhoz alkalmas, de egyébként számításilag költséges, mert először az adatok bal és jobb oldalát kell rendeznie, mielőtt egyesítené őket.

Az Broadcast illesztés leginkább kisebb adathalmazokhoz alkalmas, vagy ha az illesztés egyik oldala sokkal kisebb, mint a másik oldal. Ez az illesztéstípus az összes végrehajtó egyik oldalát közvetíti, ezért általában több memóriát igényel a közvetítésekhez.

A konfigurációban módosíthatja az illesztés típusát a beállítással spark.sql.autoBroadcastJoinThreshold, vagy beállíthatja az illesztési tippet a DataFrame API-k (dataframe.join(broadcast(df2))) használatával.

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Ha gyűjtőtáblákat használ, akkor van egy harmadik illesztéstípusa, az Merge illesztés. A megfelelően előre particionált és előre rendezett adathalmazok kihagyják a drága rendezési fázist egy SortMerge illesztésből.

Az illesztések sorrendje számít, különösen az összetettebb lekérdezésekben. Kezdje a legválogatottabb illesztésekkel. Emellett áthelyezheti azokat az illesztéseket is, amelyek lehetőség szerint növelik a sorok számát az összesítések után.

A Cartesian-illesztések párhuzamosságának kezeléséhez hozzáadhat beágyazott struktúrákat, ablakokat, és kihagyhat egy vagy több lépést a Spark-feladatban.

Válassza ki a megfelelő végrehajtóméretet

A végrehajtó konfigurációjának kiválasztásakor vegye figyelembe a Java-szemétgyűjtés (GC) többletterhelését.

  • A végrehajtó méretének csökkentésére használt tényezők:

    • Csökkentse a halom méretét 32 GB alatt, hogy a GC többletterhelése < 10%legyen.
    • Csökkentse a magok számát, hogy a GC többletterhelése < 10%legyen.
  • A végrehajtó méretének növelését befolyásoló tényezők:

    • Csökkentse a végrehajtók közötti kommunikációs többletterhelést.
    • Csökkentse a nagyobb fürtökön> (100 végrehajtó) lévő végrehajtók (N2) közötti nyitott kapcsolatok számát.
    • Növelje a halomméretet a memóriaigényes feladatokhoz való igazodáshoz.
    • Nem kötelező: A végrehajtónkénti memóriaterhelés csökkentése.
    • Nem kötelező: A processzor túljegyzésével növelheti a kihasználtságot és az egyidejűséget.

A végrehajtó méretének kiválasztásakor a hüvelykujj általános szabálya:

  • Kezdjen 30 GB-tal végrehajtónként, és ossza el a rendelkezésre álló gépmagokat.
  • Növelje a nagyobb fürtök végrehajtó magjainak számát (> 100 végrehajtó).
  • Módosítsa a méretet mind a próbafuttatások, mind az előző tényezők, például a GC többletterhelése alapján.

Egyidejű lekérdezések futtatásakor vegye figyelembe a következőket:

  • Első lépésként 30 GB/végrehajtó és minden gépmag.
  • Több párhuzamos Spark-alkalmazás létrehozása a processzor túligénylésével (kb. 30%-os késés-javulás).
  • Lekérdezések elosztása párhuzamos alkalmazások között.
  • Módosítsa a méretet mind a próbafuttatások, mind az előző tényezők, például a GC többletterhelése alapján.

Az idősornézet, az SQL Graph, a feladatstatisztikák és így tovább nézetben figyelheti a lekérdezés teljesítményét a kiugró értékek vagy egyéb teljesítményproblémák esetén. Néha egy vagy néhány végrehajtó lassabb, mint a többi, és a feladatok végrehajtása sokkal tovább tart. Ez gyakran előfordul nagyobb fürtökön (> 30 csomóponton). Ebben az esetben ossza fel a munkát nagyobb számú tevékenységre, hogy az ütemező kompenzálhassa a lassú tevékenységeket.

Például legalább kétszer annyi feladat van, mint az alkalmazásban lévő végrehajtó magok száma. A tevékenységek spekulatív végrehajtását is engedélyezheti a következővel conf: spark.speculation = true: .

A feladat-végrehajtás optimalizálása

  • Szükség szerint gyorsítótárazva, például ha kétszer használja az adatokat, akkor gyorsítótárazza.
  • Szórásváltozók az összes végrehajtónak. A változók csak egyszer vannak szerializálva, ami gyorsabb kereséseket eredményez.
  • Használja a szálkészletet az illesztőprogramon, ami sok feladat gyorsabb működését eredményezi.

A Spark 2.x lekérdezési teljesítményének kulcsa a volfrámmotor, amely a teljes fázisú kódgenerálástól függ. Bizonyos esetekben a teljes fázisú kódlétrehozás le lehet tiltva.

Ha például nem mutable típusú (string) típust használ az aggregációs kifejezésben, SortAggregate akkor ahelyett HashAggregate, hogy . A jobb teljesítmény érdekében például próbálkozzon a következő lépésekkel, majd engedélyezze újra a kódgenerálást:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Következő lépések