Megosztás a következőn keresztül:


Az Azure Cosmos DB használata az Apache Spark 3 használatával az Azure Synapse Linkben

Ebben a cikkben megtudhatja, hogyan használhatja az Azure Cosmos DB-t a Synapse Apache Spark 3 használatával. A Scala, a Python, a SparkSQL és a C# teljes körű támogatásával a Synapse Apache Spark 3 központi szerepet kap az Azure Cosmos DB-hez készült Azure Synapse Link elemzési, adatelemzési, adatelemzési és adatfeltárási forgatókönyveiben.

Az Azure Cosmos DB használata során az alábbi képességek támogatottak:

  • A Synapse Apache Spark 3 lehetővé teszi olyan adatok elemzését az Azure Cosmos DB-tárolókban, amelyek az Azure Synapse Link használatával közel valós időben vannak engedélyezve anélkül, hogy befolyásolják a tranzakciós számítási feladatok teljesítményét. Az Alábbi két lehetőség áll rendelkezésre az Azure Cosmos DB elemzési tár lekérdezéséhez a Sparkból:
    • Betöltés a Spark DataFrame-be
    • Spark-tábla létrehozása
  • A Synapse Apache Spark lehetővé teszi az adatok Azure Cosmos DB-be való betöltését is. Fontos megjegyezni, hogy az adatok mindig az Azure Cosmos DB-tárolókba kerülnek a tranzakciós tárolón keresztül. Ha a Synapse Link engedélyezve van, a rendszer automatikusan szinkronizálja az új beszúrásokat, frissítéseket és törléseket az elemzési tárba.
  • A Synapse Apache Spark a Spark strukturált streamelését is támogatja az Azure Cosmos DB-vel forrásként és fogadóként.

A következő szakaszok végigvezetik a fenti képességek szintaxisán. A Learn modulból azt is megtudhatja, hogyan kérdezheti le az Azure Cosmos DB-t az Azure Synapse Analyticshez készült Apache Spark használatával. Az Azure Synapse Analytics-munkaterület kézmozdulatai úgy lettek kialakítva, hogy az első lépésekhez egyszerű, beépített élményt nyújtson. A kézmozdulatok akkor jelennek meg, ha a jobb gombbal egy Azure Cosmos DB-tárolóra kattint a Synapse-munkaterület Adat lapján. A kézmozdulatokkal gyorsan hozhat létre kódot, és testre szabhatja azt az igényei szerint. A kézmozdulatok az adatok egy kattintással való felderítéséhez is ideálisan használhatók.

Fontos

Tisztában kell lennie az elemzési séma néhány olyan korlátozásával, amelyek az adatbetöltési műveletek váratlan viselkedéséhez vezethetnek. Például a tranzakciós sémából csak az első 1000 tulajdonság érhető el az elemzési sémában, a szóközökkel rendelkező tulajdonságok nem érhetők el stb. Ha váratlan eredményeket tapasztal, további részletekért tekintse meg az elemzési tár sémakorlátozásait .

Azure Cosmos DB-elemzési tár lekérdezése

Mielőtt megismerkedik az Azure Cosmos DB elemzési tár lekérdezésének, a Spark DataFrame-be való betöltésének és a Spark-tábla létrehozásának két lehetséges lehetőségével, érdemes megvizsgálnia a felhasználói élmény különbségeit, hogy kiválaszthatja az igényeinek megfelelő lehetőséget.

A tapasztalatbeli különbség az, hogy az Azure Cosmos DB-tároló mögöttes adatváltozásainak automatikusan tükröződniük kell-e a Sparkban végzett elemzésben. Ha egy Spark DataFrame regisztrálva van, vagy egy Spark-tábla jön létre egy tároló elemzési tárolóján, a rendszer az elemzési tárban lévő adatok aktuális pillanatképe körüli metaadatokat a Sparkba olvassa be a későbbi elemzés hatékony leküldése érdekében. Fontos megjegyezni, hogy mivel a Spark egy lusta kiértékelési szabályzatot követ, hacsak nem hív meg egy műveletet a Spark DataFrame-en, vagy nem hajt végre SparkSQL-lekérdezést a Spark-táblán, a rendszer nem olvassa be a tényleges adatokat az alapul szolgáló tároló elemzési tárolójából.

A Spark DataFrame-be való betöltéskor a beolvasott metaadatok a Spark-munkamenet teljes élettartama alatt gyorsítótárazva maradnak, így a DataFrame-en meghívott további műveletek kiértékelése a DataFrame létrehozásakor az elemzési tárba került pillanatkép alapján történik.

Ezzel szemben a Spark-táblák létrehozásakor a rendszer nem gyorsítótárazza a Sparkban az elemzési tár állapotának metaadatait, hanem újra betölti őket a Spark-táblán végrehajtott összes SparkSQL-lekérdezés végrehajtásakor.

Ezért választhat a Spark DataFrame betöltése és a Spark-táblázat létrehozása között aszerint, hogy a Spark-elemzést az elemzési tár rögzített pillanatképével vagy az elemzési tár legújabb pillanatképével összehasonlítva szeretné elvégezni.

Feljegyzés

Az Azure Cosmos DB for MongoDB-fiókok lekérdezéséhez tudjon meg többet az elemzési tár teljes hűségséma-ábrázolásáról és a használandó kiterjesztett tulajdonságnevekről.

Feljegyzés

Vegye figyelembe, hogy az alábbi parancsok mindegyike options megkülönbözteti a kis- és nagybetűket.

Betöltés a Spark DataFrame-be

Ebben a példában létrehoz egy Spark DataFrame-et, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha Spark-műveleteket invoktál a DataFrame-hez. Ez a művelet nem befolyásolja a tranzakciós tárolót.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark-tábla létrehozása

Ebben a példában létrehoz egy Spark-táblát, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha SparkSQL-lekérdezéseket invoktál a táblára. Ez a művelet nem befolyásolja a tranzakciós tárolót, és nem jár adatáthelyezéssel. Ha úgy dönt, hogy törli ezt a Spark-táblát, a rendszer nem fogja érinteni a mögöttes Azure Cosmos DB-tárolót és a megfelelő elemzési tárat.

Ez a forgatókönyv kényelmesen használható a Spark-táblák külső eszközökkel való újrafelhasználására, valamint a mögöttes adatok elérhetőségének biztosítására a futásidejű használatra.

A Spark-táblázat létrehozásához a következő szintaxis szükséges:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Feljegyzés

Ha vannak olyan forgatókönyvek, amelyekben a mögöttes Azure Cosmos DB-tároló sémája idővel megváltozik; És ha azt szeretné, hogy a frissített séma automatikusan tükrözze a Spark-tábla lekérdezéseiben, ezt úgy érheti el, hogy true a Spark-tábla beállításai között adja meg a spark.cosmos.autoSchemaMerge beállítást.

Spark DataFrame írása az Azure Cosmos DB-tárolóba

Ebben a példában egy Spark DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázison kiépített kérelemegységeket.

A Python szintaxisa a következő:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Adatfolyam-adatkeret betöltése tárolóból

Ebben a kézmozdulatban a Spark Streaming funkcióval tölt be adatokat egy tárolóból egy adatkeretbe. Az adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.

Feljegyzés

Ha külső kódtárakra szeretne hivatkozni a Synapse Apache Sparkban, itt talál további információt. Ha például Egy Spark DataFrame-et a MongoDB-hez készült Azure Cosmos DB-tárolóba szeretne betölteni, itt használhatja a Spark MongoDB-összekötőt.

DataFrame betöltése az Azure Cosmos DB-tárolóból

Ebben a példában a Spark strukturált streamelési funkciójával adatokat tölthet be egy Azure Cosmos DB-tárolóból egy Spark streamelési adatkeretbe az Azure Cosmos DB változáscsatorna funkciójával. A Spark által használt ellenőrzőpont-adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Streamelési DataFrame írása az Azure Cosmos DB-tárolóba

Ebben a példában egy Stream DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket. Ha a /localWriteCheckpointFolder mappa nincs létrehozva (az alábbi példában), a mappa automatikusan létrejön.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Következő lépések