Interakce se službou Azure Cosmos DB pomocí Apache Sparku 3 ve službě Azure Synapse Link

V tomto článku se dozvíte, jak pracovat se službou Azure Cosmos DB pomocí Synapse Apache Spark 3. Díky plné podpoře jazyků Scala, Python, SparkSQL a C# je Synapse Apache Spark 3 ústředním centrem scénářů analýz, přípravy dat, datových věd a zkoumání dat ve službě Azure Synapse Link pro Azure Cosmos DB.

Při interakci se službou Azure Cosmos DB jsou podporovány následující možnosti:

  • Synapse Apache Spark 3 umožňuje téměř v reálném čase analyzovat data v kontejnerech Azure Cosmos DB s povoleným Azure Synapse Linkem, aniž by to mělo vliv na výkon transakčních úloh. Pro dotazování analytického úložiště Azure Cosmos DB ze Sparku jsou k dispozici následující dvě možnosti:
    • Načtení do datového rámce Sparku
    • Vytvoření tabulky Sparku
  • Synapse Apache Spark také umožňuje ingestovat data do služby Azure Cosmos DB. Je důležité si uvědomit, že data se vždy ingestují do kontejnerů Azure Cosmos DB prostřednictvím transakčního úložiště. Když je povolená Synapse Link, všechna nová vložení, aktualizace a odstranění se pak automaticky synchronizují s analytickým úložištěm.
  • Synapse Apache Spark také podporuje strukturované streamování Sparku se službou Azure Cosmos DB jako zdrojem i jímkou.

Následující části vás provedou syntaxí výše uvedených funkcí. Můžete se také podívat na modul Learn o dotazování služby Azure Cosmos DB pomocí Apache Sparku pro Azure Synapse Analytics. Gesta v pracovním prostoru Azure Synapse Analytics jsou navržená tak, aby poskytovala jednoduché prostředí, které vám umožní začít. Gesta se zobrazí po kliknutí pravým tlačítkem na kontejner Azure Cosmos DB na kartě Data pracovního prostoru Synapse. Pomocí gest můžete rychle vygenerovat kód a přizpůsobit ho vašim potřebám. Gesta jsou také ideální ke zjišťování dat jedním kliknutím.

Důležité

Měli byste vědět o některých omezeních v analytickém schématu, která by mohla vést k neočekávanému chování při operacích načítání dat. Například v analytickém schématu je k dispozici pouze prvních 1 000 vlastností z transakčního schématu, vlastnosti s mezerami nejsou k dispozici atd. Pokud dochází k neočekávaným výsledkům, projděte si omezení schématu analytického úložiště , kde najdete další podrobnosti.

Dotazování analytického úložiště Azure Cosmos DB

Než se seznámíte se dvěma možnými možnostmi dotazování analytického úložiště Azure Cosmos DB, načtením do datového rámce Sparku a vytvořením tabulky Spark, je vhodné prozkoumat rozdíly v prostředí, abyste si mohli vybrat možnost, která vyhovuje vašim potřebám.

Rozdíl v zkušenostech spočívá v tom, jestli se změny podkladových dat v kontejneru Azure Cosmos DB mají automaticky promítnout do analýzy provedené ve Sparku. Když je zaregistrovaný datový rámec Sparku nebo se vytvoří tabulka Sparku pro analytické úložiště kontejneru, metadata kolem aktuálního snímku dat v analytickém úložišti se načte do Sparku, aby byla následná analýza efektivní. Je důležité si uvědomit, že vzhledem k tomu, že Spark dodržuje opožděné zásady vyhodnocení, pokud se v datovém rámci Sparku nevyvolá akce nebo se vůči tabulce SparkSQL nespustí dotaz SparkSQL, skutečná data se z analytického úložiště podkladového kontejneru nenačítají.

Při načítání do datového rámce Sparku jsou načtená metadata uložená v mezipaměti po celou dobu relace Sparku. To znamená, že k vyhodnocení následných akcí prováděných s datovým rámcem se použije snímek analytického úložiště v okamžiku vytvoření datového rámce.

V případě vytvoření tabulky Sparku se metadata o stavu analytického úložiště neukládají do mezipaměti Sparku, ale znovu se načtou při každém spuštění dotazu SparkSQL do tabulky Sparku.

Můžete si tedy vybrat, jestli načtete datový rámec Sparku, nebo vytvoříte tabulku Sparku podle toho, jestli chcete k vyhodnocení analýzy Sparku použít pevně daný snímek analytického úložiště nebo jeho nejnovější snímek.

Poznámka

Pokud chcete dotazovat účty Azure Cosmos DB pro MongoDB, přečtěte si další informace o plně věrné reprezentaci schématu v analytickém úložišti a rozšířených názvech vlastností, které se mají použít.

Poznámka

Mějte na paměti, že ve všech options následujících příkazech se rozlišují malá a velká písmena.

Načtení do datového rámce Sparku

V tomto příkladu vytvoříte datový rámec Sparku, který odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním akcí Sparku u datového rámce. Tato operace nemá vliv na transakční úložiště.

Syntaxe v Pythonu by byla následující:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Ekvivalentní syntaxe v jazyce Scala by byla následující:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Vytvoření tabulky Sparku

V tomto příkladu vytvoříte tabulku Sparku, která odkazuje na analytické úložiště Azure Cosmos DB. Pak můžete provést další analýzu vyvoláním dotazů SparkSQL na tabulku. Tato operace nemá vliv na transakční úložiště ani nedochází k přesunu dat. Pokud se rozhodnete odstranit tuto tabulku Sparku, nebude to mít vliv na základní kontejner Azure Cosmos DB a odpovídající analytické úložiště.

Tento scénář je vhodný k opakovanému použití tabulek Sparku prostřednictvím nástrojů třetích stran a zajištění přístupnosti k podkladovým datům za běhu.

Syntaxe pro vytvoření tabulky Sparku je následující:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Poznámka

Pokud máte scénáře, ve kterých se schéma základního kontejneru Azure Cosmos DB mění v průběhu času, a pokud chcete, aby se aktualizované schéma automaticky projevilo v dotazech na tabulku Spark, můžete toho dosáhnout tak, že v možnostech tabulky Sparku nastavíte spark.cosmos.autoSchemaMerge možnost na true .

Zápis datového rámce Sparku do kontejneru Azure Cosmos DB

V tomto příkladu zapíšete datový rámec Sparku do kontejneru Azure Cosmos DB. Tato operace bude mít vliv na výkon transakčních úloh a spotřebuje jednotky žádostí zřízené v kontejneru Azure Cosmos DB nebo sdílené databázi.

Syntaxe v Pythonu by byla následující:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Ekvivalentní syntaxe v jazyce Scala by byla následující:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Načtení streamovaného datového rámce z kontejneru

V tomto gestu použijete funkci streamování Sparku k načtení dat z kontejneru do datového rámce. Data se uloží v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.

Poznámka

Pokud chcete odkazovat na externí knihovny v Synapse Apache Sparku, přečtěte si další informace tady. Pokud například chcete ingestovat datový rámec Sparku do kontejneru Azure Cosmos DB pro MongoDB, můžete pro Spark využít konektor MongoDB.

Načtení streamovaného datového rámce z kontejneru Azure Cosmos DB

V tomto příkladu použijete schopnost strukturovaného streamování Sparku k načtení dat z kontejneru Azure Cosmos DB do datového rámce streamování Sparku pomocí funkce kanálu změn ve službě Azure Cosmos DB. Data kontrolního bodu používaná Sparkem se uloží v primárním účtu Data Lake (a systému souborů), který jste připojili k pracovnímu prostoru.

Syntaxe v Pythonu by byla následující:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Ekvivalentní syntaxe v jazyce Scala by byla následující:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Zápis streamovaného datového rámce do kontejneru Azure Cosmos DB

V tomto příkladu zapíšete streamovací datový rámec do kontejneru Azure Cosmos DB. Tato operace bude mít vliv na výkon transakčních úloh a využívat jednotky žádostí zřízené v kontejneru nebo sdílené databázi Azure Cosmos DB. Pokud se nevytvořila složka /localWriteCheckpointFolder (v následujícím příkladu), vytvoří se automaticky.

Syntaxe v Pythonu by byla následující:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Ekvivalentní syntaxe v jazyce Scala by byla následující:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Další kroky