Megosztás a következőn keresztül:


Az Azure Cosmos DB használata az Apache Spark 2 használatával az Azure Synapse Linkben

Feljegyzés

Ha az Azure Cosmos DB-hez készült Azure Synapse Linket a Spark 3 használatával szeretné használni, tekintse meg ezt a cikket : Azure Synapse Link for Azure Cosmos DB a Spark 3-on

Ebben a cikkben megtudhatja, hogyan használhatja az Azure Cosmos DB-t a Synapse Apache Spark 2 használatával. A Scala, a Python, a SparkSQL és a C# teljes körű támogatásával a Synapse Apache Spark központi szerepet kap az Azure Cosmos DB-hez készült Azure Synapse Link elemzési, adatelemzési, adatelemzési és adatfeltárási forgatókönyveiben.

Az Azure Cosmos DB használata során az alábbi képességek támogatottak:

  • A Synapse Apache Spark lehetővé teszi olyan adatok elemzését az Azure Cosmos DB-tárolókban, amelyek közel valós időben engedélyezve vannak az Azure Synapse Linkkel anélkül, hogy befolyásolni szeretné a tranzakciós számítási feladatok teljesítményét. Az Alábbi két lehetőség áll rendelkezésre az Azure Cosmos DB elemzési tár lekérdezéséhez a Sparkból:
    • Betöltés a Spark DataFrame-be
    • Spark-tábla létrehozása
  • A Synapse Apache Spark lehetővé teszi az adatok Azure Cosmos DB-be való betöltését is. Fontos megjegyezni, hogy az adatok mindig az Azure Cosmos DB-tárolókba kerülnek a tranzakciós tárolón keresztül. Ha a Synapse Link engedélyezve van, a rendszer automatikusan szinkronizálja az új beszúrásokat, frissítéseket és törléseket az elemzési tárba.
  • A Synapse Apache Spark a Spark strukturált streamelését is támogatja az Azure Cosmos DB-vel forrásként és fogadóként.

A következő szakaszok végigvezetik a fenti képességek szintaxisán. A Learn modulból azt is megtudhatja, hogyan kérdezheti le az Azure Cosmos DB-t az Azure Synapse Analyticshez készült Apache Spark használatával. Az Azure Synapse Analytics-munkaterület kézmozdulatai úgy lettek kialakítva, hogy az első lépésekhez egyszerű, beépített élményt nyújtson. A kézmozdulatok akkor jelennek meg, ha a jobb gombbal egy Azure Cosmos DB-tárolóra kattint a Synapse-munkaterület Adat lapján. A kézmozdulatokkal gyorsan hozhat létre kódot, és testre szabhatja azt az igényei szerint. A kézmozdulatok az adatok egy kattintással való felderítéséhez is ideálisan használhatók.

Fontos

Tisztában kell lennie az elemzési séma néhány olyan korlátozásával, amelyek az adatbetöltési műveletek váratlan viselkedéséhez vezethetnek. Például a tranzakciós sémából csak az első 1000 tulajdonság érhető el az elemzési sémában, a szóközökkel rendelkező tulajdonságok nem érhetők el stb. Ha váratlan eredményeket tapasztal, további részletekért tekintse meg az elemzési tár sémakorlátozásait .

Azure Cosmos DB-elemzési tár lekérdezése

Mielőtt megismerkedik az Azure Cosmos DB elemzési tár lekérdezésének, a Spark DataFrame-be való betöltésének és a Spark-tábla létrehozásának két lehetséges lehetőségével, érdemes megvizsgálnia a felhasználói élmény különbségeit, hogy kiválaszthatja az igényeinek megfelelő lehetőséget.

A tapasztalatbeli különbség az, hogy az Azure Cosmos DB-tároló mögöttes adatváltozásainak automatikusan tükröződniük kell-e a Sparkban végzett elemzésben. Ha egy Spark DataFrame regisztrálva van, vagy egy Spark-tábla jön létre egy tároló elemzési tárolóján, a rendszer az elemzési tárban lévő adatok aktuális pillanatképe körüli metaadatokat a Sparkba olvassa be a későbbi elemzés hatékony leküldése érdekében. Fontos megjegyezni, hogy mivel a Spark egy lusta kiértékelési szabályzatot követ, hacsak nem hív meg egy műveletet a Spark DataFrame-en, vagy nem hajt végre SparkSQL-lekérdezést a Spark-táblán, a rendszer nem olvassa be a tényleges adatokat az alapul szolgáló tároló elemzési tárolójából.

A Spark DataFrame-be való betöltéskor a beolvasott metaadatok a Spark-munkamenet teljes élettartama alatt gyorsítótárazva maradnak, így a DataFrame-en meghívott további műveletek kiértékelése a DataFrame létrehozásakor az elemzési tárba került pillanatkép alapján történik.

Ezzel szemben a Spark-táblák létrehozásakor a rendszer nem gyorsítótárazza a Sparkban az elemzési tár állapotának metaadatait, hanem újra betölti őket a Spark-táblán végrehajtott összes SparkSQL-lekérdezés végrehajtásakor.

Ezért választhat a Spark DataFrame betöltése és a Spark-táblázat létrehozása között aszerint, hogy a Spark-elemzést az elemzési tár rögzített pillanatképével vagy az elemzési tár legújabb pillanatképével összehasonlítva szeretné elvégezni.

Ha az elemzési lekérdezések gyakran használnak szűrőket, lehetősége van ezen mezők alapján particionálásra a jobb lekérdezési teljesítmény érdekében. A particionálási feladatot rendszeresen végrehajthatja egy Azure Synapse Spark-jegyzetfüzetből, hogy particionálást aktiváljon az elemzési tárban. Ez a particionált tároló az Azure Synapse-munkaterülethez társított elsődleges ADLS Gen2-tárfiókra mutat. További információkért tekintse meg az egyéni particionálás bevezetését és az egyéni particionálási cikkek konfigurálását .

Feljegyzés

Az Azure Cosmos DB for MongoDB-fiókok lekérdezéséhez tudjon meg többet az elemzési tár teljes hűségséma-ábrázolásáról és a használandó kiterjesztett tulajdonságnevekről.

Feljegyzés

Vegye figyelembe, hogy az alábbi parancsok mindegyike options megkülönbözteti a kis- és nagybetűket. Például azt kell használnia Gateway , amíg gateway hibát ad vissza.

Betöltés a Spark DataFrame-be

Ebben a példában létrehoz egy Spark DataFrame-et, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha Spark-műveleteket invoktál a DataFrame-hez. Ez a művelet nem befolyásolja a tranzakciós tárolót.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark-tábla létrehozása

Ebben a példában létrehoz egy Spark-táblát, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha SparkSQL-lekérdezéseket invoktál a táblára. Ez a művelet nem befolyásolja a tranzakciós tárolót, és nem jár adatáthelyezéssel. Ha úgy dönt, hogy törli ezt a Spark-táblát, a rendszer nem fogja érinteni a mögöttes Azure Cosmos DB-tárolót és a megfelelő elemzési tárat.

Ez a forgatókönyv kényelmesen használható a Spark-táblák külső eszközökkel való újrafelhasználására, valamint a mögöttes adatok elérhetőségének biztosítására a futásidejű használatra.

A Spark-táblázat létrehozásához a következő szintaxis szükséges:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Feljegyzés

Ha vannak olyan forgatókönyvek, amelyekben a mögöttes Azure Cosmos DB-tároló sémája idővel megváltozik; És ha azt szeretné, hogy a frissített séma automatikusan tükrözze a Spark-tábla lekérdezéseiben, ezt úgy érheti el, hogy true a Spark-tábla beállításai között adja meg a spark.cosmos.autoSchemaMerge beállítást.

Spark DataFrame írása az Azure Cosmos DB-tárolóba

Ebben a példában egy Spark DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázison kiépített kérelemegységeket.

A Python szintaxisa a következő:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Adatfolyam-adatkeret betöltése tárolóból

Ebben a kézmozdulatban a Spark Streaming funkcióval tölt be adatokat egy tárolóból egy adatkeretbe. Az adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.

Feljegyzés

Ha külső kódtárakra szeretne hivatkozni a Synapse Apache Sparkban, itt talál további információt. Ha például egy Spark DataFrame-et szeretne a MongoDB-hez készült Azure Cosmos DB-tárolóba betöltésre, használhatja a Spark MongoDB-összekötőt.

DataFrame betöltése az Azure Cosmos DB-tárolóból

Ebben a példában a Spark strukturált streamelési funkciójával adatokat tölthet be egy Azure Cosmos DB-tárolóból egy Spark streamelési adatkeretbe az Azure Cosmos DB változáscsatorna funkciójával. A Spark által használt ellenőrzőpont-adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.

Ha a /localReadCheckpointFolder mappa nincs létrehozva (az alábbi példában), az automatikusan létrejön. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Streamelési DataFrame írása az Azure Cosmos DB-tárolóba

Ebben a példában egy Stream DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket. Ha a /localWriteCheckpointFolder mappa nincs létrehozva (az alábbi példában), a mappa automatikusan létrejön.

A Python szintaxisa a következő:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

A Scala egyenértékű szintaxisa a következő:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Következő lépések