Az Azure Cosmos DB használata az Apache Spark 2 használatával az Azure Synapse Linkben
Feljegyzés
Ha az Azure Cosmos DB-hez készült Azure Synapse Linket a Spark 3 használatával szeretné használni, tekintse meg ezt a cikket : Azure Synapse Link for Azure Cosmos DB a Spark 3-on
Ebben a cikkben megtudhatja, hogyan használhatja az Azure Cosmos DB-t a Synapse Apache Spark 2 használatával. A Scala, a Python, a SparkSQL és a C# teljes körű támogatásával a Synapse Apache Spark központi szerepet kap az Azure Cosmos DB-hez készült Azure Synapse Link elemzési, adatelemzési, adatelemzési és adatfeltárási forgatókönyveiben.
Az Azure Cosmos DB használata során az alábbi képességek támogatottak:
- A Synapse Apache Spark lehetővé teszi olyan adatok elemzését az Azure Cosmos DB-tárolókban, amelyek közel valós időben engedélyezve vannak az Azure Synapse Linkkel anélkül, hogy befolyásolni szeretné a tranzakciós számítási feladatok teljesítményét. Az Alábbi két lehetőség áll rendelkezésre az Azure Cosmos DB elemzési tár lekérdezéséhez a Sparkból:
- Betöltés a Spark DataFrame-be
- Spark-tábla létrehozása
- A Synapse Apache Spark lehetővé teszi az adatok Azure Cosmos DB-be való betöltését is. Fontos megjegyezni, hogy az adatok mindig az Azure Cosmos DB-tárolókba kerülnek a tranzakciós tárolón keresztül. Ha a Synapse Link engedélyezve van, a rendszer automatikusan szinkronizálja az új beszúrásokat, frissítéseket és törléseket az elemzési tárba.
- A Synapse Apache Spark a Spark strukturált streamelését is támogatja az Azure Cosmos DB-vel forrásként és fogadóként.
A következő szakaszok végigvezetik a fenti képességek szintaxisán. A Learn modulból azt is megtudhatja, hogyan kérdezheti le az Azure Cosmos DB-t az Azure Synapse Analyticshez készült Apache Spark használatával. Az Azure Synapse Analytics-munkaterület kézmozdulatai úgy lettek kialakítva, hogy az első lépésekhez egyszerű, beépített élményt nyújtson. A kézmozdulatok akkor jelennek meg, ha a jobb gombbal egy Azure Cosmos DB-tárolóra kattint a Synapse-munkaterület Adat lapján. A kézmozdulatokkal gyorsan hozhat létre kódot, és testre szabhatja azt az igényei szerint. A kézmozdulatok az adatok egy kattintással való felderítéséhez is ideálisan használhatók.
Fontos
Tisztában kell lennie az elemzési séma néhány olyan korlátozásával, amelyek az adatbetöltési műveletek váratlan viselkedéséhez vezethetnek. Például a tranzakciós sémából csak az első 1000 tulajdonság érhető el az elemzési sémában, a szóközökkel rendelkező tulajdonságok nem érhetők el stb. Ha váratlan eredményeket tapasztal, további részletekért tekintse meg az elemzési tár sémakorlátozásait .
Azure Cosmos DB-elemzési tár lekérdezése
Mielőtt megismerkedik az Azure Cosmos DB elemzési tár lekérdezésének, a Spark DataFrame-be való betöltésének és a Spark-tábla létrehozásának két lehetséges lehetőségével, érdemes megvizsgálnia a felhasználói élmény különbségeit, hogy kiválaszthatja az igényeinek megfelelő lehetőséget.
A tapasztalatbeli különbség az, hogy az Azure Cosmos DB-tároló mögöttes adatváltozásainak automatikusan tükröződniük kell-e a Sparkban végzett elemzésben. Ha egy Spark DataFrame regisztrálva van, vagy egy Spark-tábla jön létre egy tároló elemzési tárolóján, a rendszer az elemzési tárban lévő adatok aktuális pillanatképe körüli metaadatokat a Sparkba olvassa be a későbbi elemzés hatékony leküldése érdekében. Fontos megjegyezni, hogy mivel a Spark egy lusta kiértékelési szabályzatot követ, hacsak nem hív meg egy műveletet a Spark DataFrame-en, vagy nem hajt végre SparkSQL-lekérdezést a Spark-táblán, a rendszer nem olvassa be a tényleges adatokat az alapul szolgáló tároló elemzési tárolójából.
A Spark DataFrame-be való betöltéskor a beolvasott metaadatok a Spark-munkamenet teljes élettartama alatt gyorsítótárazva maradnak, így a DataFrame-en meghívott további műveletek kiértékelése a DataFrame létrehozásakor az elemzési tárba került pillanatkép alapján történik.
Ezzel szemben a Spark-táblák létrehozásakor a rendszer nem gyorsítótárazza a Sparkban az elemzési tár állapotának metaadatait, hanem újra betölti őket a Spark-táblán végrehajtott összes SparkSQL-lekérdezés végrehajtásakor.
Ezért választhat a Spark DataFrame betöltése és a Spark-táblázat létrehozása között aszerint, hogy a Spark-elemzést az elemzési tár rögzített pillanatképével vagy az elemzési tár legújabb pillanatképével összehasonlítva szeretné elvégezni.
Ha az elemzési lekérdezések gyakran használnak szűrőket, lehetősége van ezen mezők alapján particionálásra a jobb lekérdezési teljesítmény érdekében. A particionálási feladatot rendszeresen végrehajthatja egy Azure Synapse Spark-jegyzetfüzetből, hogy particionálást aktiváljon az elemzési tárban. Ez a particionált tároló az Azure Synapse-munkaterülethez társított elsődleges ADLS Gen2-tárfiókra mutat. További információkért tekintse meg az egyéni particionálás bevezetését és az egyéni particionálási cikkek konfigurálását .
Feljegyzés
Az Azure Cosmos DB for MongoDB-fiókok lekérdezéséhez tudjon meg többet az elemzési tár teljes hűségséma-ábrázolásáról és a használandó kiterjesztett tulajdonságnevekről.
Feljegyzés
Vegye figyelembe, hogy az alábbi parancsok mindegyike options
megkülönbözteti a kis- és nagybetűket. Például azt kell használnia Gateway
, amíg gateway
hibát ad vissza.
Betöltés a Spark DataFrame-be
Ebben a példában létrehoz egy Spark DataFrame-et, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha Spark-műveleteket invoktál a DataFrame-hez. Ez a művelet nem befolyásolja a tranzakciós tárolót.
A Python szintaxisa a következő:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
A Scala egyenértékű szintaxisa a következő:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Spark-tábla létrehozása
Ebben a példában létrehoz egy Spark-táblát, amely az Azure Cosmos DB elemzési tárára mutat. Ezután további elemzéseket végezhet, ha SparkSQL-lekérdezéseket invoktál a táblára. Ez a művelet nem befolyásolja a tranzakciós tárolót, és nem jár adatáthelyezéssel. Ha úgy dönt, hogy törli ezt a Spark-táblát, a rendszer nem fogja érinteni a mögöttes Azure Cosmos DB-tárolót és a megfelelő elemzési tárat.
Ez a forgatókönyv kényelmesen használható a Spark-táblák külső eszközökkel való újrafelhasználására, valamint a mögöttes adatok elérhetőségének biztosítására a futásidejű használatra.
A Spark-táblázat létrehozásához a következő szintaxis szükséges:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Feljegyzés
Ha vannak olyan forgatókönyvek, amelyekben a mögöttes Azure Cosmos DB-tároló sémája idővel megváltozik; És ha azt szeretné, hogy a frissített séma automatikusan tükrözze a Spark-tábla lekérdezéseiben, ezt úgy érheti el, hogy true
a Spark-tábla beállításai között adja meg a spark.cosmos.autoSchemaMerge
beállítást.
Spark DataFrame írása az Azure Cosmos DB-tárolóba
Ebben a példában egy Spark DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázison kiépített kérelemegységeket.
A Python szintaxisa a következő:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
A Scala egyenértékű szintaxisa a következő:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
Adatfolyam-adatkeret betöltése tárolóból
Ebben a kézmozdulatban a Spark Streaming funkcióval tölt be adatokat egy tárolóból egy adatkeretbe. Az adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.
Feljegyzés
Ha külső kódtárakra szeretne hivatkozni a Synapse Apache Sparkban, itt talál további információt. Ha például egy Spark DataFrame-et szeretne a MongoDB-hez készült Azure Cosmos DB-tárolóba betöltésre, használhatja a Spark MongoDB-összekötőt.
DataFrame betöltése az Azure Cosmos DB-tárolóból
Ebben a példában a Spark strukturált streamelési funkciójával adatokat tölthet be egy Azure Cosmos DB-tárolóból egy Spark streamelési adatkeretbe az Azure Cosmos DB változáscsatorna funkciójával. A Spark által használt ellenőrzőpont-adatok a munkaterülethez csatlakoztatott elsődleges Data Lake-fiókban (és fájlrendszerben) lesznek tárolva.
Ha a /localReadCheckpointFolder mappa nincs létrehozva (az alábbi példában), az automatikusan létrejön. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket.
A Python szintaxisa a következő:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.readEnabled", "true")\
.option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
.option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
.option("spark.cosmos.changeFeed.queryName", "streamQuery")\
.load()
A Scala egyenértékű szintaxisa a következő:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.readEnabled", "true").
option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
option("spark.cosmos.changeFeed.queryName", "streamQuery").
load()
Streamelési DataFrame írása az Azure Cosmos DB-tárolóba
Ebben a példában egy Stream DataFrame-et fog írni egy Azure Cosmos DB-tárolóba. Ez a művelet hatással lesz a tranzakciós számítási feladatok teljesítményére, és felhasználja az Azure Cosmos DB-tárolón vagy a megosztott adatbázisban kiépített kérelemegységeket. Ha a /localWriteCheckpointFolder mappa nincs létrehozva (az alábbi példában), a mappa automatikusan létrejön.
A Python szintaxisa a következő:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
def writeBatchToCosmos(batchDF, batchId):
batchDF.persist()
print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.unpersist()
streamQuery = dfStream\
.writeStream\
.foreachBatch(writeBatchToCosmos) \
.option("checkpointLocation", "/localWriteCheckpointFolder")\
.start()
streamQuery.awaitTermination()
A Scala egyenértékű szintaxisa a következő:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
val query = dfStream.
writeStream.
foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
batchDF.unpersist()
()
}.
option("checkpointLocation", "/localWriteCheckpointFolder").
start()
query.awaitTermination()
Következő lépések
- Minták az Azure Synapse Link GitHubon való használatának megkezdéséhez
- Az Azure Cosmos DB-hez készült Azure Synapse Link által támogatott információk
- Csatlakozás az Azure Cosmos DB-hez készült Synapse Linkhez
- Tekintse meg a Learn modult, amely bemutatja, hogyan kérdezheti le az Azure Cosmos DB-t az Azure Synapse Analyticshez készült Apache Spark használatával.