Interagire con Azure Cosmos DB usando Apache Spark 2 in Collegamento ad Azure Synapse

Nota

Per Collegamento ad Azure Synapse per Azure Cosmos DB con Spark 3, vedere questo articolo Collegamento ad Azure Synapse per Azure Cosmos DB in Spark 3

In questo articolo verrà illustrato come interagire con Azure Cosmos DB usando Apache Spark 2 per Synapse. Grazie al supporto completo per Scala, Python, SparkSQL e C#, Apache Spark per Synapse è fondamentale per gli scenari di analisi, ingegneria dei dati, data science e di esplorazione dei dati nel collegamento ad Azure Synapse per Azure Cosmos DB.

Le funzionalità seguenti sono supportate durante l'interazione con Azure Cosmos DB:

  • Apache Spark per Synapse consente di analizzare i dati nei contenitori Azure Cosmos DB abilitati con il collegamento ad Azure Synapse quasi in tempo reale senza influire sulle prestazioni dei carichi di lavoro transazionali. Per eseguire una query sull'archivio analitico di Azure Cosmos DB da Spark sono disponibili le due opzioni seguenti:
    • Creazione di un frame di dati Spark
    • Creazione di una tabella Spark
  • Apache Spark per Synapse consente inoltre di inserire dati in Azure Cosmos DB. È importante notare che i dati vengono sempre inseriti in contenitori di Azure Cosmos DB tramite l'archivio transazionale. Quando il collegamento a Synapse è abilitato, eventuali nuovi inserimenti, aggiornamenti ed eliminazioni vengono sincronizzati automaticamente con l'archivio analitico.
  • Apache Spark per Synapse supporta anche lo streaming strutturato Spark con Azure Cosmos DB come origine, oltre a un sink.

Le sezioni seguenti illustrano la sintassi delle funzionalità indicate precedentemente. È anche possibile eseguire il checkout del modulo Learn su come eseguire query su Azure Cosmos DB con Apache Spark per Azure Synapse Analytics. I movimenti nell'area di lavoro di Azure Synapse Analytics sono progettati per offrire una semplice esperienza predefinita per iniziare. I movimenti sono visibili quando si fa clic con il pulsante destro del mouse su un contenitore Azure Cosmos DB nella scheda Dati dell'area di lavoro di Synapse. Con i movimenti è possibile generare rapidamente il codice e modificarlo in base alle esigenze. Sono inoltre perfetti per l'individuazione dei dati con un singolo clic.

Importante

È necessario tenere presente alcuni vincoli nello schema analitico che potrebbero causare un comportamento imprevisto nelle operazioni di caricamento dei dati. Ad esempio, solo le prime 1000 proprietà dello schema transazionale sono disponibili nello schema analitico, le proprietà con spazi non sono disponibili e così via. Se si verificano risultati imprevisti, controllare i vincoli dello schema dell'archivio analitico per altri dettagli.

Eseguire query sull'archivio analitico di Azure Cosmos DB

Prima di acquisire informazioni sulle due opzioni possibili per eseguire query sull'archivio analitico di Azure Cosmos DB, ovvero mediante il caricamento nel dataframe Spark e la creazione di una tabella Spark, è opportuno esplorare le differenze dell'esperienza, in modo da poter scegliere l'opzione adatta alle proprie esigenze.

La differenza nell'esperienza si verifica se le modifiche ai dati nel contenitore di Azure Cosmos DB devono essere riflesse automaticamente nell'analisi eseguita in Spark. Quando un dataframe Spark viene registrato o viene creata una tabella Spark rispetto all'archivio analitico di un contenitore, i metadati relativi allo snapshot corrente dei dati nell'archivio analitico vengono recuperati in Spark per una distribuzione efficiente dell'analisi successiva. È importante tenere presente che, poiché Spark segue i criteri di valutazione lazy, a meno che non venga richiamata un'azione nel dataframe Spark o venga eseguita una query SparkSQL sulla tabella Spark, i dati effettivi non vengono recuperati dall'archivio analitico del contenitore sottostante.

In caso di caricamento nel dataframe Spark, i metadati recuperati vengono memorizzati nella cache per tutta la durata della sessione Spark e quindi le azioni successive richiamate nel dataframe vengono valutate rispetto allo snapshot dell'archivio analitico al momento della creazione del dataframe.

D'altra parte, in aso di creazione di una tabella Spark, i metadati dello stato dell'archivio analitico non vengono memorizzati nella cache in Spark e vengono ricaricati a ogni esecuzione della query SparkSQL sulla tabella Spark.

Pertanto, è possibile scegliere tra il caricamento nel dataframe Spark e la creazione di una tabella Spark a seconda che si desideri che l'analisi Spark venga valutata in base a uno snapshot fisso dell'archivio analitico o allo snapshot più recente dell'archivio analitico.

Se le query analitiche hanno filtri usati di frequente, è possibile partizionare in base a questi campi per migliorare le prestazioni delle query. È possibile eseguire periodicamente il processo di partizionamento da un notebook di Azure Synapse Spark per attivare il partizionamento nell'archivio analitico. Questo archivio partizionato punta all'account di archiviazione primario di ADLS Gen2 collegato all'area di lavoro di Azure Synapse. Per altre informazioni, vedere gli articoli Introduzione al partizionamento personalizzato e Come configurare il partizionamento personalizzato.

Nota

Per eseguire una query sugli account Azure Cosmos DB for MongoDB, è possibile ottenere altre informazioni sulla rappresentazione dello schema con fedeltà completa nell'archivio analitico e sui nomi di proprietà estese da usare.

Nota

Si noti che options nei comandi seguenti fanno distinzione tra maiuscole e minuscole. Ad esempio, è necessario usare Gateway mentre gateway restituirà un errore.

Creazione di un frame di dati Spark

In questo esempio verrà creato un dataframe Spark che punta all'archivio analitico di Azure Cosmos DB. È quindi possibile eseguire un'analisi aggiuntiva richiamando le azioni Spark sul dataframe. Questa operazione non ha alcun impatto sull'archivio transazionale.

La sintassi in Python è la seguente:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

La sintassi equivalente in Scala sarà la seguente:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Creazione di una tabella Spark

In questo esempio verrà creata una tabella Spark che punta all'archivio analitico di Azure Cosmos DB. È quindi possibile eseguire un'analisi aggiuntiva richiamando le query SparkSQL sulla tabella. Questa operazione non ha alcun effetto sull'archivio transazionale, né comporta alcuno spostamento dei dati. Se si decide di eliminare la tabella Spark, il contenitore Azure Cosmos DB sottostante e l'archivio analitico corrispondente non saranno interessati.

Questo scenario è utile per riusare le tabelle Spark tramite strumenti di terze parti e fornire accessibilità ai dati sottostanti per il runtime.

La sintassi per creare una tabella Spark è la seguente:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Nota

Se si hanno scenari in cui lo schema del contenitore Azure Cosmos DB sottostante cambia nel tempo e se si vuole che lo schema aggiornato rifletta automaticamente le query sulla tabella Spark, è possibile ottenere questo risultato impostando l'opzione spark.cosmos.autoSchemaMerge su true nelle opzioni della tabella Spark.

Scrivere un dataframe Spark sul contenitore Azure Cosmos DB

In questo esempio si scriverà un dataframe Spark su un contenitore Azure Cosmos DB. Questa operazione influirà sulle prestazioni dei carichi di lavoro transazionali e userà le unità richieste sottoposte a provisioning sul contenitore Azure Cosmos DB o sul database condiviso.

La sintassi in Python è la seguente:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

La sintassi equivalente in Scala sarà la seguente:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Caricamento di dataframe in streaming da un contenitore

In questo gesto si userà la funzionalità di streaming di Spark per caricare i dati da un contenitore in un dataframe. I dati verranno archiviati nell'account Data Lake primario (e nel file system) connesso all'area di lavoro.

Nota

Altre informazioni su come fare riferimento alle librerie esterne in Synapse Apache Spark, sono disponibili qui. Ad esempio, se si sta cercando di inserire un dataframe Spark su un contenitore di Azure Cosmos DB for MongoDB, è possibile usare il connettore MongoDB per Spark.

Caricamento di dataframe in streaming da un contenitore Azure Cosmos DB

In questo esempio si userà la funzionalità di streaming strutturato di Spark per caricare i dati da un contenitore Azure Cosmos DB su un dataframe di streaming Spark usando la funzionalità del feed di modifiche in Azure Cosmos DB. I dati del checkpoint usati da Spark verranno archiviati nell'account data lake primario (e nel file system) connesso all'area di lavoro.

Se la cartella /localReadCheckpointFolder non è stata creata (nell'esempio sottostante), verrà creata automaticamente. Questa operazione influirà sulle prestazioni dei carichi di lavoro transazionali e userà le unità richieste sottoposte a provisioning sul contenitore Azure Cosmos DB o sul database condiviso.

La sintassi in Python è la seguente:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

La sintassi equivalente in Scala sarà la seguente:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Scrittura di dataframe in streaming su un contenitore Azure Cosmos DB

In questo esempio si scriverà un dataframe in streaming su un contenitore Azure Cosmos DB. Questa operazione influirà sulle prestazioni dei carichi di lavoro transazionali e userà le unità richieste sottoposte a provisioning sul contenitore Azure Cosmos DB o sul database condiviso. Se la cartella /localWriteCheckpointFolder non è stata creata (nell'esempio sottostante), verrà creata automaticamente.

La sintassi in Python è la seguente:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

La sintassi equivalente in Scala sarà la seguente:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Passaggi successivi