Interagera med Azure Cosmos DB med Apache Spark 2 i Azure Synapse Link

Anteckning

Information om Azure Synapse Link för Azure Cosmos DB med Spark 3 finns i den här artikeln Azure Synapse Link för Azure Cosmos DB på Spark 3

I den här artikeln får du lära dig hur du interagerar med Azure Cosmos DB med Synapse Apache Spark 2. Synapse Apache Spark har fullt stöd för Scala, Python, SparkSQL och C# och är centralt för scenarier för analys, datateknik, datavetenskap och datautforskning i Azure Synapse Link för Azure Cosmos DB.

Följande funktioner stöds när du interagerar med Azure Cosmos DB:

  • Med Synapse Apache Spark kan du analysera data i dina Azure Cosmos DB-containrar som är aktiverade med Azure Synapse Link nästan i realtid utan att påverka prestandan för dina transaktionsarbetsbelastningar. Följande två alternativ är tillgängliga för att köra frågor mot Azure Cosmos DB-analysarkivet från Spark:
    • Läsa in till Spark DataFrame
    • Skapa Spark-tabell
  • Med Synapse Apache Spark kan du också mata in data i Azure Cosmos DB. Observera att data alltid matas in i Azure Cosmos DB-containrar via transaktionslagret. När Synapse Link är aktiverat synkroniseras alla nya infogningar, uppdateringar och borttagningar automatiskt till analysarkivet.
  • Synapse Apache Spark har också stöd för Spark-strukturerad direktuppspelning med Azure Cosmos DB som källa och mottagare.

I följande avsnitt går vi igenom syntaxen för ovanstående funktioner. Du kan också kolla in Learn-modulen om hur du frågar Azure Cosmos DB med Apache Spark för Azure Synapse Analytics. Gester i Azure Synapse Analytics-arbetsytan är utformade för att ge en enkel välkomstupplevelse för att komma igång. Gester visas när du högerklickar på en Azure Cosmos DB-container på fliken Data på Synapse-arbetsytan. Med gester kan du snabbt generera kod och skräddarsy den efter dina behov. Gester passar också perfekt för att upptäcka data med ett enda klick.

Viktigt

Du bör vara medveten om vissa begränsningar i analysschemat som kan leda till oväntat beteende vid datainläsningsåtgärder. Till exempel är endast de första 1 000 egenskaperna från transaktionsschemat tillgängliga i analysschemat, egenskaper med blanksteg är inte tillgängliga osv. Om du får oväntade resultat kontrollerar du schemabegränsningarna för analysarkivet för mer information.

Fråga Azure Cosmos DB-analysarkiv

Innan du lär dig mer om de två möjliga alternativen för att fråga Azure Cosmos DB-analysarkiv, läsa in till Spark DataFrame och skapa Spark-tabell, är det värt att utforska skillnaderna i upplevelse så att du kan välja det alternativ som fungerar för dina behov.

Skillnaden i erfarenhet handlar om huruvida underliggande dataändringar i Azure Cosmos DB-containern ska återspeglas automatiskt i analysen som utförs i Spark. När antingen en Spark DataFrame registreras eller en Spark-tabell skapas mot en containers analysarkiv, hämtas metadata runt den aktuella ögonblicksbilden av data i analysarkivet till Spark för effektiv pushdown av efterföljande analys. Det är viktigt att observera att eftersom Spark följer en lat utvärderingsprincip, såvida inte en åtgärd anropas på Spark DataFrame eller en SparkSQL-fråga körs mot Spark-tabellen, hämtas inte faktiska data från den underliggande containerns analysarkiv.

Vid inläsning till Spark DataFrame cachelagras hämtade metadata under Spark-sessionens livstid och därmed utvärderas efterföljande åtgärder som utförs för DataFrame mot ögonblicksbilden av analysarkivet vid tidpunkten då DataFrame skapades.

Men om en Spark-tabell skapas cachelagras inte metadata för analysarkivet i Spark utan läses in på nytt vid varje SparkSQL-frågekörning mot Spark-tabellen.

Det innebär att du kan välja mellan att läsa in till Spark DataFrame och att skapa en Spark-tabell beroende på om du vill att din Spark-analys ska utvärderas mot en fast ögonblicksbild av analysarkivet eller mot den senaste ögonblicksbilden av analysarkivet.

Om dina analysfrågor har filter som används ofta kan du partitionera baserat på dessa fält för bättre frågeprestanda. Du kan regelbundet köra partitioneringsjobb från en Azure Synapse Spark-notebook-fil för att utlösa partitionering i analysarkivet. Det här partitionerade arkivet pekar på det primära ADLS Gen2-lagringskontot som är länkat till din Azure Synapse arbetsyta. Mer information finns i introduktionen till anpassad partitionering och hur du konfigurerar anpassade partitioneringsartiklar .

Anteckning

Om du vill fråga Azure Cosmos DB efter MongoDB-konton kan du läsa mer om schemarepresentationen med fullständig återgivning i analysarkivet och de utökade egenskapsnamn som ska användas.

Anteckning

Observera att alla options kommandon nedan är skiftlägeskänsliga. Du måste till exempel använda Gateway när gateway returnerar ett fel.

Läsa in till Spark DataFrame

I det här exemplet skapar du en Spark DataFrame som pekar på Azure Cosmos DB-analysarkivet. Du kan sedan utföra ytterligare analys genom att anropa Spark-åtgärder mot DataFrame. Den här åtgärden påverkar inte transaktionslagret.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Skapa Spark-tabell

I det här exemplet skapar du en Spark-tabell som pekar på Azure Cosmos DB-analysarkivet. Du kan sedan utföra ytterligare analys genom att anropa SparkSQL-frågor mot tabellen. Den här åtgärden påverkar varken transaktionslagret eller medför dataflytt. Om du bestämmer dig för att ta bort den här Spark-tabellen påverkas inte den underliggande Azure Cosmos DB-containern och motsvarande analysarkiv.

Det här scenariot är praktiskt att återanvända Spark-tabeller via verktyg från tredje part och tillhandahålla tillgänglighet till underliggande data för körningen.

Syntaxen för att skapa en Spark-tabell är följande:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Anteckning

Om du har scenarier där schemat för den underliggande Azure Cosmos DB-containern ändras över tid; och om du vill att det uppdaterade schemat automatiskt ska återspeglas i frågorna mot Spark-tabellen kan du uppnå detta genom att ange spark.cosmos.autoSchemaMerge alternativet till true i Spark-tabellalternativen.

Skriva Spark DataFrame till Azure Cosmos DB-container

I det här exemplet skriver du en Spark DataFrame till en Azure Cosmos DB-container. Den här åtgärden påverkar prestandan för transaktionsarbetsbelastningar och förbrukar enheter för programbegäran som etablerats i Azure Cosmos DB-containern eller den delade databasen.

Syntaxen i Python skulle vara följande:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Läsa in strömmande DataFrame från container

I den här gesten använder du Spark Streaming-funktionen för att läsa in data från en container till en dataram. Data lagras i det primära Data Lake-kontot (och filsystemet) som du anslöt till arbetsytan.

Anteckning

Om du vill referera till externa bibliotek i Synapse Apache Spark kan du läsa mer här. Om du till exempel vill mata in en Spark DataFrame till en container i Azure Cosmos DB for MongoDB kan du använda MongoDB-anslutningsappen för Spark.

Läsa in strömmande DataFrame från Azure Cosmos DB-container

I det här exemplet använder du Sparks funktioner för strukturerad direktuppspelning för att läsa in data från en Azure Cosmos DB-container till en Spark-strömmande dataram med hjälp av ändringsflödesfunktionen i Azure Cosmos DB. Kontrollpunktsdata som används av Spark lagras i det primära Data Lake-kontot (och filsystemet) som du anslöt till arbetsytan.

Om mappen /localReadCheckpointFolder inte skapas (i exemplet nedan) skapas den automatiskt. Den här åtgärden påverkar prestandan för transaktionella arbetsbelastningar och förbrukar enheter för programbegäran som etablerats i Azure Cosmos DB-containern eller den delade databasen.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Skriva strömmande DataFrame till Azure Cosmos DB-container

I det här exemplet skriver du en strömmande DataFrame till en Azure Cosmos DB-container. Den här åtgärden påverkar prestandan för transaktionella arbetsbelastningar och förbrukar enheter för programbegäran som etablerats i Azure Cosmos DB-containern eller den delade databasen. Om mappen /localWriteCheckpointFolder inte skapas (i exemplet nedan) skapas den automatiskt.

Syntaxen i Python skulle vara följande:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Motsvarande syntax i Scala skulle vara följande:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Nästa steg