Interakce se službou Azure Cosmos DB pomocí Apache Sparku 2 ve službě Azure Synapse Link
Poznámka:
Informace o Azure Synapse Linku pro Azure Cosmos DB s využitím Sparku 3 najdete v tomto článku o Azure Synapse Linku pro Azure Cosmos DB ve Sparku 3.
V tomto článku se dozvíte, jak pracovat se službou Azure Cosmos DB pomocí Synapse Apache Sparku 2. Díky plné podpoře jazyka Scala, Pythonu, SparkSQL a C# je Synapse Apache Spark ústředním cílem analýzy, přípravy dat, datových věd a scénářů zkoumání dat ve službě Azure Synapse Link pro Azure Cosmos DB.
Při interakci se službou Azure Cosmos DB se podporují následující možnosti:
- Synapse Apache Spark umožňuje analyzovat data v kontejnerech Azure Cosmos DB, které jsou povolené pomocí Azure Synapse Linku téměř v reálném čase, aniž by to mělo vliv na výkon transakčních úloh. Pro dotazování analytického úložiště Azure Cosmos DB ze Sparku jsou k dispozici následující dvě možnosti:
- Načtení do datového rámce Spark
- Vytvoření tabulky Spark
- Synapse Apache Spark také umožňuje ingestovat data do služby Azure Cosmos DB. Je důležité si uvědomit, že data se vždy ingestují do kontejnerů Azure Cosmos DB prostřednictvím transakčního úložiště. Když je synapse Link povolený, všechny nové vložení, aktualizace a odstranění se pak automaticky synchronizují do analytického úložiště.
- Synapse Apache Spark také podporuje strukturované streamování Sparku se službou Azure Cosmos DB jako zdrojem a jímkou.
Následující části vás provedou syntaxí výše uvedených funkcí. Můžete si také prohlédnout modul Learn o dotazování služby Azure Cosmos DB pomocí Apache Sparku pro Azure Synapse Analytics. Gesta v pracovním prostoru Azure Synapse Analytics jsou navržená tak, aby poskytovala snadné integrované prostředí, které vám umožní začít. Gesta se zobrazí, když kliknete pravým tlačítkem na kontejner Azure Cosmos DB na kartě Data v pracovním prostoru Synapse. Pomocí gest můžete rychle vygenerovat kód a přizpůsobit ho vašim potřebám. Gesta jsou také ideální ke zjišťování dat jedním kliknutím.
Důležité
Měli byste vědět o některých omezeních analytického schématu, která by mohla vést k neočekávanému chování při operacích načítání dat. Například v analytickém schématu jsou k dispozici pouze prvních 1000 vlastností z transakčního schématu, vlastnosti s mezerami nejsou k dispozici atd. Pokud dochází k neočekávaným výsledkům, podívejte se na omezení schématu analytického úložiště, kde najdete další podrobnosti.
Dotazování analytického úložiště Azure Cosmos DB
Než se seznámíte se dvěma možnými možnostmi dotazování analytického úložiště Azure Cosmos DB, načtení do datového rámce Sparku a vytvoření tabulky Spark, je vhodné prozkoumat rozdíly v prostředí, abyste mohli zvolit možnost, která bude vyhovovat vašim potřebám.
Rozdíl v zkušenostech spočívá v tom, jestli by se základní změny dat v kontejneru Azure Cosmos DB měly automaticky promítat do analýzy provedené ve Sparku. Při registraci datového rámce Sparku nebo vytvoření tabulky Sparku v analytickém úložišti kontejneru se metadata kolem aktuálního snímku dat v analytickém úložišti načítají do Sparku za účelem efektivního nasdílení následných analýz. Je důležité si uvědomit, že vzhledem k tomu, že Spark se řídí opožděnými zásadami vyhodnocení, pokud není vyvolána akce v datovém rámci Sparku nebo v dotazu SparkSQL, se skutečná data nenačítají z analytického úložiště podkladového kontejneru.
Při načítání do datového rámce Sparku jsou načtená metadata uložená v mezipaměti po celou dobu relace Sparku. To znamená, že k vyhodnocení následných akcí prováděných s datovým rámcem se použije snímek analytického úložiště v okamžiku vytvoření datového rámce.
V případě vytvoření tabulky Sparku se metadata o stavu analytického úložiště neukládají do mezipaměti Sparku, ale znovu se načtou při každém spuštění dotazu SparkSQL do tabulky Sparku.
Můžete si tedy vybrat, jestli načtete datový rámec Sparku nebo vytvoříte tabulku Sparku podle toho, jestli chcete k vyhodnocení analýzy Sparku použít pevně daný snímek analytického úložiště nebo jeho nejnovější snímek.
Pokud vaše analytické dotazy často používají filtry, máte možnost rozdělit oddíly na základě těchto polí, abyste dosáhli lepšího výkonu dotazů. Úlohy dělení můžete pravidelně spouštět z poznámkového bloku Azure Synapse Spark a aktivovat dělení v analytickém úložišti. Toto dělené úložiště odkazuje na primární účet úložiště ADLS Gen2, který je propojený s vaším pracovním prostorem Azure Synapse. Další informace najdete v úvodu k vlastnímu dělení a postupu při konfiguraci vlastních článků o dělení.
Poznámka:
Pokud chcete dotazovat účty Azure Cosmos DB pro MongoDB, přečtěte si další informace o úplné reprezentaci schématu věrnosti v analytickém úložišti a o rozšířených názvech vlastností, které se mají použít.
Poznámka:
Upozorňujeme, že options
v následujících příkazech se rozlišují malá a velká písmena. Musíte například použít Gateway
, když gateway
se vrátí chyba.
Načtení do datového rámce Spark
V tomto příkladu vytvoříte datový rámec Sparku, který odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním akcí Sparku proti datovému rámci. Tato operace nemá vliv na transakční úložiště.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Vytvoření tabulky Spark
V tomto příkladu vytvoříte tabulku Sparku, která odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním dotazů SparkSQL na tabulku. Tato operace nemá vliv na transakční úložiště ani nedochází k žádnému přesunu dat. Pokud se rozhodnete odstranit tuto tabulku Sparku, nebude ovlivněn základní kontejner Azure Cosmos DB a odpovídající analytické úložiště.
Tento scénář je vhodný pro opakované použití tabulek Sparku prostřednictvím nástrojů třetích stran a zajištění přístupnosti podkladových dat za běhu.
Syntaxe pro vytvoření tabulky Sparku je následující:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Poznámka:
Pokud máte scénáře, ve kterých se schéma základního kontejneru Azure Cosmos DB mění v průběhu času; a pokud chcete, aby se aktualizované schéma automaticky odráželo v dotazech na tabulku Spark, můžete toho dosáhnout nastavením spark.cosmos.autoSchemaMerge
možnosti true
v možnostech tabulky Spark.
Zápis datového rámce Sparku do kontejneru Azure Cosmos DB
V tomto příkladu napíšete datový rámec Sparku do kontejneru Azure Cosmos DB. Tato operace ovlivní výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru Azure Cosmos DB nebo sdílené databázi.
Syntaxe v Pythonu by byla následující:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
Načtení streamovaného datového rámce z kontejneru
V tomto gestu použijete funkci streamování Sparku k načtení dat z kontejneru do datového rámce. Data budou uložená v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.
Poznámka:
Pokud chcete odkazovat na externí knihovny ve Službě Synapse Apache Spark, přečtěte si další informace tady. Pokud například chcete ingestovat datový rámec Sparku do kontejneru služby Azure Cosmos DB pro MongoDB, můžete použít konektor MongoDB pro Spark.
Načtení streamovaného datového rámce z kontejneru Azure Cosmos DB
V tomto příkladu použijete funkci strukturovaného streamování Sparku k načtení dat z kontejneru Azure Cosmos DB do streamovaného datového rámce Sparku pomocí funkcí kanálu změn ve službě Azure Cosmos DB. Data kontrolního bodu používaná Sparkem budou uložená v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.
Pokud složka /localReadCheckpointFolder není vytvořená (v příkladu níže), automaticky se vytvoří. Tato operace ovlivní výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru nebo sdílené databázi Azure Cosmos DB.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.readEnabled", "true")\
.option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
.option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
.option("spark.cosmos.changeFeed.queryName", "streamQuery")\
.load()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.readEnabled", "true").
option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
option("spark.cosmos.changeFeed.queryName", "streamQuery").
load()
Zápis streamovaného datového rámce do kontejneru Azure Cosmos DB
V tomto příkladu napíšete datový rámec streamování do kontejneru Azure Cosmos DB. Tato operace ovlivní výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru nebo sdílené databázi Azure Cosmos DB. Pokud složka /localWriteCheckpointFolder není vytvořená (v následujícím příkladu), automaticky se vytvoří.
Syntaxe v Pythonu by byla následující:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
def writeBatchToCosmos(batchDF, batchId):
batchDF.persist()
print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.unpersist()
streamQuery = dfStream\
.writeStream\
.foreachBatch(writeBatchToCosmos) \
.option("checkpointLocation", "/localWriteCheckpointFolder")\
.start()
streamQuery.awaitTermination()
Ekvivalentní syntaxe v jazyce Scala by byla následující:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
val query = dfStream.
writeStream.
foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
batchDF.unpersist()
()
}.
option("checkpointLocation", "/localWriteCheckpointFolder").
start()
query.awaitTermination()