Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
PLATÍ PRO: NoSQL
Konektor Spark služby Azure Cosmos DB poskytuje výkonný způsob zpracování kanálu změn ve velkém měřítku pomocí Apache Sparku. Konektor používá sadu Java SDK pod ní a implementuje model vyžádané replikace , který transparentně distribuuje zpracování napříč exekutory Sparku, což je ideální pro scénáře zpracování velkých objemů dat.
Jak funguje konektor Spark
Konektor Spark pro službu Azure Cosmos DB vychází ze sady Java SDK služby Azure Cosmos DB a implementuje přístup k modelu vyžádání změn ke čtení kanálu změn. Mezi klíčové charakteristiky patří:
- Základy sady Java SDK: Pro spolehlivé zpracování kanálu změn používá robustní sadu Java SDK služby Azure Cosmos DB.
- Implementace modelu přijetí změn: Řídí se vzorem modelu vyžádání změn kanálu změn , který vám dává kontrolu nad tempem zpracování.
- Distribuované zpracování: Automaticky distribuuje zpracování kanálu změn mezi několik exekutorů Sparku pro paralelní zpracování.
- Transparentní škálování: Konektor zpracovává dělení a distribuci zatížení bez nutnosti ručního zásahu.
Jedinečná funkce kontrolních bodů
Jednou z klíčových výhod použití konektoru Sparku pro zpracování kanálu změn je integrovaný mechanismus kontrolních bodů. Tato funkce poskytuje:
- Automatické obnovení: Předefinovaný mechanismus pro obnovení při zpracování kanálu změn ve velkém měřítku
- Odolnost proti chybám: Schopnost obnovit zpracování z posledního kontrolního bodu v případě selhání
- Správa stavu: Udržuje stav zpracování napříč relacemi Sparku a restartováním clusteru
- Škálovatelnost: Podporuje vytváření kontrolních bodů napříč distribuovanými prostředími Sparku.
Tato funkce kontrolních bodů je jedinečná pro konektor Spark a není dostupná při přímém použití sad SDK, což je zvláště cenné pro produkční scénáře vyžadující vysokou dostupnost a spolehlivost.
Výstraha
Konfigurace spark.cosmos.changeFeed.startFrom se ignoruje, pokud jsou v umístění kontrolního bodu existující záložky. Při obnovení z kontrolního bodu bude spojnice pokračovat z poslední zpracované pozice místo zadaného počátečního bodu.
Kdy použít Spark ke zpracování kanálu změn
V těchto scénářích zvažte použití konektoru Spark pro zpracování kanálu změn:
- Rozsáhlé zpracování dat: Pokud potřebujete zpracovat velké objemy dat kanálu změn, které překračují možnosti jednoho počítače
- Složité transformace: Když zpracování kanálu změn zahrnuje složité transformace dat, agregace nebo spojení s jinými datovými sadami
- Distribuované analýzy: Pokud potřebujete provádět analýzy v reálném čase nebo téměř v reálném čase s daty kanálu změn v distribuovaném prostředí
- Integrace s datovými kanály: Při zpracování kanálu změn je součástí větších kanálů ETL/ELT, které již používají Spark
- Požadavky na odolnost proti chybám: Pokud potřebujete robustní kontrolní body a mechanismy obnovení pro produkční úlohy
- Zpracování více kontejnerů: Pokud potřebujete zpracovat kanály změn z více kontejnerů současně
V případě jednodušších scénářů nebo v případě, že potřebujete jemně odstupňovanou kontrolu nad individuálním zpracováním dokumentů, zvažte použití procesoru kanálu změn nebo načíst model přímo se sadami SDK.
Příklady kódu
Následující příklady ukazují, jak číst z kanálu změn pomocí konektoru Spark. Podrobnější příklady najdete v úplných ukázkových poznámkových blocích:
- Ukázka strukturovaného streamování v Pythonu – Zpracování dat taxi v NYC s využitím kanálu změn
- Ukázka migrace kontejnerů Scala – Migrace kontejnerů za provozu s využitím kanálu změn
# Configure change feed reading
changeFeedConfig = {
"spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.com:443/",
"spark.cosmos.accountKey": "<account-key>",
"spark.cosmos.database": "<database-name>",
"spark.cosmos.container": "<container-name>",
# Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
"spark.cosmos.changeFeed.startFrom": "Beginning", # "Now" or "2020-02-10T14:15:03"
"spark.cosmos.changeFeed.mode": "LatestVersion", # or "AllVersionsAndDeletes"
# Control batch size - if not set, all available data processed in first batch
"spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
"spark.cosmos.read.partitioning.strategy": "Restrictive"
}
# Read change feed as a streaming DataFrame
changeFeedDF = spark \
.readStream \
.format("cosmos.oltp.changeFeed") \
.options(**changeFeedConfig) \
.load()
# Configure output settings with checkpointing
outputConfig = {
"spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.com:443/",
"spark.cosmos.accountKey": "<target-account-key>",
"spark.cosmos.database": "<target-database>",
"spark.cosmos.container": "<target-container>",
"spark.cosmos.write.strategy": "ItemOverwrite"
}
# Process and write the change feed data with checkpointing
query = changeFeedDF \
.selectExpr("*") \
.writeStream \
.format("cosmos.oltp") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/changefeed-checkpoint") \
.options(**outputConfig) \
.start()
# Wait for the streaming query to finish
query.awaitTermination()
Možnosti konfigurace klíče
Při práci s kanálem změn ve Sparku jsou tyto možnosti konfigurace obzvláště důležité:
-
spark.cosmos.changeFeed.startFrom: Určuje, kde začít číst informační kanál změn.
-
"Beginning"- Začněte od začátku kanálu změn. -
"Now"- Začít od aktuálního času -
"2020-02-10T14:15:03"- Začněte od určitého časového razítka (formát ISO 8601) - Poznámka: Toto nastavení se ignoruje, pokud existují záložky v umístění kontrolního bodu.
-
-
spark.cosmos.changeFeed.mode: Určuje režim kanálu změn.
-
"LatestVersion"– Zpracování pouze nejnovější verze změněných dokumentů -
"AllVersionsAndDeletes"- Zpracování všech verzí změn včetně odstranění
-
-
spark.cosmos.changeFeed.itemCountPerTriggerHint: Řídí velikost dávkového zpracování
- Přibližný maximální počet položek přečtených z kanálu změn pro každou mikrodávku nebo trigger
- Příklad:
"50000" - Důležité: Pokud není nastavená, budou všechna dostupná data v kanálu změn zpracována v první mikrodávce.
- checkpointLocation: Určuje, kam se mají ukládat informace kontrolního bodu pro odolnost proti chybám a obnovení.
- spark.cosmos.read.partitioning.strategy: Řídí způsob dělení dat napříč exekutory Sparku.
Další kroky
- Další informace o vzorech návrhu kanálu změn
- Prozkoumání modelu vyžádání změn kanálu změn
- Vysvětlení procesoru kanálu změn pro scénáře s jedním počítačem
- Další možnosti konfigurace najdete v dokumentaci ke konektoru Spark.
- Projděte si režimy kanálu změn pro různé scénáře zpracování.