Interactie met Azure Cosmos DB met Apache Spark 3 in Azure Synapse Link

In dit artikel leert u hoe u kunt communiceren met Azure Cosmos DB met behulp van Synapse Apache Spark 3. Synapse Apache Spark 3 biedt volledige ondersteuning voor Scala, Python, SparkSQL en C#. Synapse Apache Spark 3 is een centrale rol in scenario's voor analyse, data engineering, gegevenswetenschap en gegevensverkenning in Azure Synapse Link voor Azure Cosmos DB.

De volgende mogelijkheden worden ondersteund bij interactie met Azure Cosmos DB:

  • Met Synapse Apache Spark 3 kunt u gegevens analyseren in uw Azure Cosmos DB-containers die zijn ingeschakeld met Azure Synapse Link in bijna realtime zonder dat dit van invloed is op de prestaties van uw transactionele workloads. De volgende twee opties zijn beschikbaar voor het opvragen van de Azure Cosmos DB-analytische opslag van Spark:
    • Laden naar Spark DataFrame
    • Spark-tabel maken
  • Met Synapse Apache Spark kunt u ook gegevens opnemen in Azure Cosmos DB. Het is belangrijk te weten dat gegevens altijd worden opgenomen in Azure Cosmos DB-containers via de transactionele opslag. Wanneer Synapse Link is ingeschakeld, worden nieuwe toevoegingen, updates en verwijderingen automatisch gesynchroniseerd met de analytische opslag.
  • Synapse Apache Spark biedt ook ondersteuning voor Spark Structured Streaming met Azure Cosmos DB als een bron en een sink.

In de volgende secties wordt stapsgewijs uitgelegd hoe u de syntaxis van de bovenstaande mogelijkheden kunt volgen. U kunt ook de Learn-module bekijken over het uitvoeren van query's op Azure Cosmos DB met Apache Spark voor Azure Synapse Analytics. Bewegingen in Azure Synapse Analytics-werkruimte zijn ontworpen om een eenvoudige out-of-the-box-ervaring te bieden om aan de slag te gaan. Gebaren worden weergegeven wanneer u met de rechtermuisknop op een Azure Cosmos DB-container klikt op het tabblad Gegevens van de Synapse-werkruimte. Met bewegingen kunt u snel code genereren en deze aanpassen aan uw behoeften. Bewegingen zijn ook ideaal om met één klik gegevens te ontdekken.

Belangrijk

Houd rekening met enkele beperkingen in het analytische schema die kunnen leiden tot onverwacht gedrag bij het laden van gegevens. In het analytische schema zijn bijvoorbeeld slechts de eerste 1000 eigenschappen van het transactionele schema beschikbaar, eigenschappen met spaties zijn niet beschikbaar, enzovoort. Als u onverwachte resultaten ondervindt, controleert u de schemabeperkingen voor analytische opslag voor meer informatie.

Analytische opslag van Azure Cosmos DB

Voordat u meer informatie krijgt over de twee mogelijke opties om een query uit te brengen op Azure Cosmos DB-analytische opslag, het laden van Spark DataFrame en het maken van een Spark-tabel, is het een goed idee om de opties in de ervaring te verkennen zodat u de optie kunt kiezen die geschikt is voor uw behoeften.

Het verschil in ervaring is om te bepalen of onderliggende gegevenswijzigingen in de Azure Cosmos DB-container automatisch moeten worden doorgevoerd in de analyse die in Spark wordt uitgevoerd. Wanneer een Spark DataFrame wordt geregistreerd of een Spark-tabel wordt gemaakt op basis van de analytische opslag van een container, worden de metagegevens van de huidige momentopname van de gegevens in het analytische archief naar Spark opgehaald voor efficiënte pushdown van de volgende analyse. Het is belangrijk te weten dat, omdat Spark een lui evaluatiebeleid volgt, tenzij een actie wordt aangeroepen op de Spark-DataFrame of een SparkSQL-query wordt uitgevoerd op basis van de Spark-tabel, de werkelijke gegevens niet worden opgehaald uit de analytische opslag van de onderliggende container.

In het geval van het laden naar Spark DataFrame, worden de opgehaalde metagegevens in de cache opgeslagen tijdens de levensduur van de Spark-sessie. Daarom worden de volgende acties die worden uitgevoerd op de DataFrame geëvalueerd op basis van de momentopname van de analytische opslag op het moment dat DataFrame wordt gemaakt.

Daarentegen, in het geval van het maken van een Spark-tabel, worden de metagegevens van de status van de analytische opslag niet in de cache opgeslagen in Spark en worden opnieuw geladen op elke SparkSQL-query-uitvoering in de Spark-tabel.

Daarom kunt u kiezen tussen het laden van Spark DataFrame en het maken van een Spark-tabel op basis van de vraag of u wilt dat uw Spark-analyse wordt geëvalueerd voor respectievelijk een vaste momentopname van de analytische opslag of met de meest recente momentopname van de analytische opslag.

Notitie

Als u een query wilt uitvoeren op Azure Cosmos DB voor MongoDB-accounts, vindt u meer informatie over de weergave van het volledige betrouwbaarheidsschema in de analytische opslag en de namen van uitgebreide eigenschappen die moeten worden gebruikt.

Notitie

Houd er rekening mee dat alle options onderstaande opdrachten hoofdlettergevoelig zijn.

Laden naar Spark DataFrame

In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op de transactionele opslag.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark-tabel maken

In dit voorbeeld maakt u een Spark-DataFrame dat verwijst naar de Azure Cosmos DB-analytische opslag. U kunt vervolgens extra analyses uitvoeren door Spark-acties aan te roepen voor het DataFrame. Deze bewerking heeft geen invloed op het transactionele archief en maakt geen gegevensverplaatsing. Als u besluit deze Spark-tabel te verwijderen, worden de onderliggende Azure Cosmos DB-container en de bijbehorende analytische opslag niet beïnvloed.

Dit scenario is handig voor hergebruik van Spark-tabellen via hulpprogramma's van derden en om de gegevens voor de uitvoering toegankelijk te maken.

De syntaxis voor het maken van een Spark-tabel is als volgt:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Notitie

Als u scenario's hebt waarin het schema van de onderliggende Azure Cosmos DB-container in de loop van de tijd verandert; En als u wilt dat het bijgewerkte schema automatisch wordt weergegeven in de query's op basis van de Spark-tabel, kunt u dit doen door de spark.cosmos.autoSchemaMerge optie in te true stellen op in de spark-tabelopties.

Spark DataFrame naar Azure Cosmos DB-container schrijven

In dit voorbeeld schrijft u een Spark DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of de gedeelde database.

De syntaxis van Python is als volgt:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Streaming-dataframe laden vanuit een container

Met deze beweging gebruikt u de mogelijkheid van Spark-streaming om gegevens uit een container in een dataframe te laden. De gegevens worden opgeslagen in het primaire Data Lake-account (en bestandssysteem) dat u hebt gekoppeld aan de werkruimte.

Notitie

Als u op zoek bent naar externe bibliotheken in Synapse Apache Spark, vindt u hier meer informatie. Als u bijvoorbeeld een Spark DataFrame wilt opnemen in een container van Azure Cosmos DB voor MongoDB, kunt u hier gebruikmaken van de MongoDB-connector voor Spark.

Streaming-DataFrame laden vanuit een Azure Cosmos DB-container

In dit voorbeeld gebruikt u de structured streaming-mogelijkheid van Spark voor het laden van gegevens uit een Azure Cosmos DB-container naar een Spark-streaming DataFrame met behulp van de wijzigingsfunctie voor feeds in Azure Cosmos DB. De controlepuntgegevens die worden gebruikt door Spark, worden opgeslagen in het primaire datalake-account (en bestandssysteem) dat u hebt verbonden met de werkruimte.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Streaming-DataFrame schrijven naar een Azure Cosmos DB-container

In dit voorbeeld schrijft u een streaming-DataFrame naar een Azure Cosmos DB-container. Deze bewerking heeft invloed op de prestaties van transactionele workloads en verbruikt aanvraageenheden die zijn ingericht op de Azure Cosmos DB-container of gedeelde database. Als de map /localWriteCheckpointFolder niet is gemaakt (in het onderstaande voorbeeld), gebeurt dit automatisch.

De syntaxis van Python is als volgt:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

De equivalente syntaxis in Scala is de volgende:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Volgende stappen