Interactie met Azure Cosmos DB met Apache Spark 2 in Azure Synapse Link
Notitie
Raadpleeg voor Azure Synapse Link voor Azure Cosmos DB met behulp van Spark 3 dit artikel Azure Synapse Link voor Azure Cosmos DB in Spark 3
In dit artikel leert u hoe u kunt communiceren met Azure Cosmos DB met behulp van Synapse Apache Spark 2. Met de volledige ondersteuning voor Scala, Python, SparkSQL en C#, staat Apache Spark Synapse centraal voor analyse, data engineering, gegevenswetenschap en gegevensexploratie in Azure Synapse Link voor Azure Cosmos DB.
De volgende mogelijkheden worden ondersteund bij interactie met Azure Cosmos DB:
- Met Synapse Apache Spark kunt u gegevens in uw Azure Cosmos DB-containers die zijn ingeschakeld met Azure Synapse Link in bijna realtime analyseren zonder dat dit van invloed is op de prestaties van uw transactionele workloads. De volgende twee opties zijn beschikbaar om een query uit te voeren op de analytische opslag van Azure Cosmos DB vanuit Spark:
- Laden naar Spark DataFrame
- Spark-tabel maken
- Met Synapse Apache Spark kunt u ook gegevens opnemen in Azure Cosmos DB. Het is belangrijk te weten dat gegevens altijd worden opgenomen in Azure Cosmos DB-containers via de transactionele opslag. Wanneer Synapse Link is ingeschakeld, worden nieuwe toevoegingen, updates en verwijderingen automatisch gesynchroniseerd met de analytische opslag.
- Synapse Apache Spark biedt ook ondersteuning voor Spark Structured Streaming met Azure Cosmos DB als een bron en een sink.
In de volgende secties wordt stapsgewijs uitgelegd hoe u de syntaxis van de bovenstaande mogelijkheden kunt volgen. U kunt ook de Learn-module bekijken over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics. Bewegingen in Azure Synapse Analytics-werkruimte zijn ontworpen om een eenvoudige out-of-the-box-ervaring te bieden om aan de slag te gaan. Gebaren worden weergegeven wanneer u met de rechtermuisknop op een Azure Cosmos DB-container klikt op het tabblad Gegevens van de Synapse-werkruimte. Met bewegingen kunt u snel code genereren en deze aanpassen aan uw behoeften. Bewegingen zijn ook ideaal om met één klik gegevens te ontdekken.
Belangrijk
U moet rekening houden met enkele beperkingen in het analytische schema die kunnen leiden tot onverwacht gedrag bij het laden van gegevens. Als voorbeeld zijn slechts de eerste 1000 eigenschappen van het transactionele schema beschikbaar in het analytische schema, eigenschappen met spaties zijn niet beschikbaar, enzovoort. Als u onverwachte resultaten ondervindt, controleert u de beperkingen van het schema voor analytische opslag voor meer informatie.
Analytische opslag van Azure Cosmos DB
Voordat u meer informatie krijgt over de twee mogelijke opties om een query uit te brengen op Azure Cosmos DB-analytische opslag, het laden van Spark DataFrame en het maken van een Spark-tabel, is het een goed idee om de opties in de ervaring te verkennen zodat u de optie kunt kiezen die geschikt is voor uw behoeften.
Het verschil in ervaring is om te bepalen of onderliggende gegevenswijzigingen in de Azure Cosmos DB-container automatisch moeten worden doorgevoerd in de analyse die in Spark wordt uitgevoerd. Wanneer een Spark DataFrame wordt geregistreerd of een Spark-tabel wordt gemaakt op basis van de analytische opslag van een container, worden de metagegevens van de huidige momentopname van de gegevens in het analytische archief naar Spark opgehaald voor efficiënte pushdown van de volgende analyse. Het is belangrijk te weten dat, omdat Spark een lui evaluatiebeleid volgt, tenzij een actie wordt aangeroepen op de Spark-DataFrame of een SparkSQL-query wordt uitgevoerd op basis van de Spark-tabel, de werkelijke gegevens niet worden opgehaald uit de analytische opslag van de onderliggende container.
In het geval van het laden naar Spark DataFrame, worden de opgehaalde metagegevens in de cache opgeslagen tijdens de levensduur van de Spark-sessie. Daarom worden de volgende acties die worden uitgevoerd op de DataFrame geëvalueerd op basis van de momentopname van de analytische opslag op het moment dat DataFrame wordt gemaakt.
Daarentegen, in het geval van het maken van een Spark-tabel, worden de metagegevens van de status van de analytische opslag niet in de cache opgeslagen in Spark en worden opnieuw geladen op elke SparkSQL-query-uitvoering in de Spark-tabel.
Daarom kunt u kiezen tussen het laden van Spark DataFrame en het maken van een Spark-tabel op basis van de vraag of u wilt dat uw Spark-analyse wordt geëvalueerd voor respectievelijk een vaste momentopname van de analytische opslag of met de meest recente momentopname van de analytische opslag.
Als uw analytische query's vaak filters hebben, hebt u de mogelijkheid om te partitioneren op basis van deze velden voor betere queryprestaties. U kunt periodiek partitioneringstaak uitvoeren vanuit een Azure Synapse Spark-notebook om partitionering in analytische opslag te activeren. Dit gepartitioneerde archief verwijst naar het primaire ADLS Gen2-opslagaccount dat is gekoppeld aan uw Azure Synapse-werkruimte. Zie de inleiding tot aangepaste partitionering en het configureren van artikelen over aangepaste partitionering voor meer informatie.
Notitie
Als u query's wilt uitvoeren op Azure Cosmos DB voor MongoDB-accounts, vindt u meer informatie over de schemaweergave van volledige betrouwbaarheid in de analytische opslag en de uitgebreide eigenschapsnamen die moeten worden gebruikt.
Notitie
Houd er rekening mee dat alle options
onderstaande opdrachten hoofdlettergevoelig zijn. U moet bijvoorbeeld een gateway
fout retournerenGateway
.
Laden naar Spark DataFrame
In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op de transactionele opslag.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Spark-tabel maken
In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op het transactionele archief en maakt geen gegevensverplaatsing. Als u besluit deze Spark-tabel te verwijderen, worden de onderliggende Azure Cosmos DB-container en de bijbehorende analytische opslag niet beïnvloed.
Dit scenario is handig voor hergebruik van Spark-tabellen via hulpprogramma's van derden en om de gegevens voor de uitvoering toegankelijk te maken.
De syntaxis voor het maken van een Spark-tabel is als volgt:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Notitie
Als u scenario's hebt waarin het schema van de onderliggende Azure Cosmos DB-container na verloop van tijd verandert; en als u wilt dat het bijgewerkte schema automatisch wordt weergegeven in de query's voor de Spark-tabel, kunt u dit doen door de optie true
in te spark.cosmos.autoSchemaMerge
stellen in de Spark-tabelopties.
Spark DataFrame naar Azure Cosmos DB-container schrijven
In dit voorbeeld schrijft u een Spark DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of de gedeelde database.
De syntaxis van Python is als volgt:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
Streaming-dataframe laden vanuit een container
Met deze beweging gebruikt u de mogelijkheid van Spark-streaming om gegevens uit een container in een dataframe te laden. De gegevens worden opgeslagen in het primaire Data Lake-account (en bestandssysteem) dat u hebt gekoppeld aan de werkruimte.
Notitie
Als u op zoek bent naar externe bibliotheken in Synapse Apache Spark, vindt u hier meer informatie. Als u bijvoorbeeld een Spark DataFrame wilt opnemen in een container van Azure Cosmos DB voor MongoDB, kunt u de MongoDB-connector voor Spark gebruiken.
Streaming-DataFrame laden vanuit een Azure Cosmos DB-container
In dit voorbeeld gebruikt u de structured streaming-mogelijkheid van Spark voor het laden van gegevens uit een Azure Cosmos DB-container naar een Spark-streaming DataFrame met behulp van de wijzigingsfunctie voor feeds in Azure Cosmos DB. De controlepuntgegevens die worden gebruikt door Spark, worden opgeslagen in het primaire datalake-account (en bestandssysteem) dat u hebt verbonden met de werkruimte.
Als de map /localReadCheckpointFolder niet is gemaakt (in het onderstaande voorbeeld), gebeurt dit automatisch. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of gedeelde database.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.readEnabled", "true")\
.option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
.option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
.option("spark.cosmos.changeFeed.queryName", "streamQuery")\
.load()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.readEnabled", "true").
option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
option("spark.cosmos.changeFeed.queryName", "streamQuery").
load()
Streaming-DataFrame schrijven naar een Azure Cosmos DB-container
In dit voorbeeld schrijft u een streaming-DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of gedeelde database. Als de map /localWriteCheckpointFolder niet is gemaakt (in het onderstaande voorbeeld), gebeurt dit automatisch.
De syntaxis van Python is als volgt:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
def writeBatchToCosmos(batchDF, batchId):
batchDF.persist()
print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.write.upsertEnabled", "true")\
.mode('append')\
.save()
print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
batchDF.unpersist()
streamQuery = dfStream\
.writeStream\
.foreachBatch(writeBatchToCosmos) \
.option("checkpointLocation", "/localWriteCheckpointFolder")\
.start()
streamQuery.awaitTermination()
De equivalente syntaxis in Scala is de volgende:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway.
val query = dfStream.
writeStream.
foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.write.upsertEnabled", "true").
mode(SaveMode.Overwrite).
save()
println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
batchDF.unpersist()
()
}.
option("checkpointLocation", "/localWriteCheckpointFolder").
start()
query.awaitTermination()
Volgende stappen
- Voorbeelden om aan de slag te gaan met Azure Synapse Link op GitHub
- Wat wordt ondersteund in Azure Synapse Link voor Azure Cosmos DB
- Verbinding maken met Synapse Link voor Azure Cosmos DB
- Bekijk de Learn-module over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics.