Interagir com o Azure Cosmos DB com o Apache Spark 2 no Azure Synapse Link

Nota

Para obter Azure Synapse Link para o Azure Cosmos DB com o Spark 3, veja este artigo Azure Synapse Link para o Azure Cosmos DB no Spark 3

Neste artigo, irá aprender a interagir com o Azure Cosmos DB com o Synapse Apache Spark 2. Com o suporte total para Scala, Python, SparkSQL e C#, o Synapse Apache Spark é fundamental para cenários de análise, engenharia de dados, ciência de dados e exploração de dados no Azure Synapse Link para o Azure Cosmos DB.

As seguintes capacidades são suportadas ao interagir com o Azure Cosmos DB:

  • O Synapse Apache Spark permite-lhe analisar dados nos contentores do Azure Cosmos DB que estão ativados com o Azure Synapse Link quase em tempo real sem afetar o desempenho das cargas de trabalho transacionais. As duas opções seguintes estão disponíveis para consultar o arquivo analítico do Azure Cosmos DB a partir do Spark:
    • Carregar para o DataFrame do Spark
    • Criar tabela do Spark
  • O Synapse Apache Spark também lhe permite ingerir dados no Azure Cosmos DB. É importante ter em atenção que os dados são sempre ingeridos em contentores do Azure Cosmos DB através do arquivo transacional. Quando Synapse Link está ativada, todas as novas inserções, atualizações e eliminações são automaticamente sincronizadas com o arquivo analítico.
  • O Synapse Apache Spark também suporta a transmissão em fluxo estruturada do Spark com o Azure Cosmos DB como uma origem, bem como um sink.

As secções seguintes explicam-lhe a sintaxe das capacidades acima. Também pode consultar o módulo do Learn sobre como Consultar o Azure Cosmos DB com o Apache Spark para Azure Synapse Analytics. Os gestos na área de trabalho do Azure Synapse Analytics foram concebidos para proporcionar uma experiência simples de começar. Os gestos são visíveis quando clica com o botão direito do rato num contentor do Azure Cosmos DB no separador Dados da área de trabalho do Synapse. Com os gestos, pode gerar rapidamente código e personalizá-lo de acordo com as suas necessidades. Os gestos também são perfeitos para detetar dados com um único clique.

Importante

Deve estar ciente de algumas restrições no esquema analítico que podem levar a um comportamento inesperado nas operações de carregamento de dados. Por exemplo, apenas as primeiras 1000 propriedades do esquema transacional estão disponíveis no esquema analítico, as propriedades com espaços não estão disponíveis, etc. Se estiver a ter alguns resultados inesperados, verifique as restrições de esquema do arquivo analítico para obter mais detalhes.

Consultar o arquivo analítico do Azure Cosmos DB

Antes de saber mais sobre as duas opções possíveis para consultar o arquivo analítico do Azure Cosmos DB, carregar para o DataFrame do Apache Spark e criar uma tabela do Spark, vale a pena explorar as diferenças de experiência para que possa escolher a opção que funciona para as suas necessidades.

A diferença de experiência é saber se as alterações de dados subjacentes no contentor do Azure Cosmos DB devem ser refletidas automaticamente na análise realizada no Spark. Quando um DataFrame do Apache Spark é registado ou uma tabela do Spark é criada no arquivo analítico de um contentor, os metadados em torno do instantâneo atual dos dados no arquivo analítico são obtidos para o Spark para um pushdown eficiente da análise subsequente. É importante ter em atenção que, uma vez que o Spark segue uma política de avaliação preguiçosa, a menos que seja invocada uma ação no DataFrame do Apache Spark ou que uma consulta SparkSQL seja executada na tabela do Spark, os dados reais não são obtidos a partir do arquivo analítico do contentor subjacente.

No caso do carregamento para o DataFrame do Spark, os metadados obtidos são colocados em cache durante a duração da sessão do Spark e, portanto, as ações subsequentes invocadas no DataFrame são avaliadas relativamente ao instantâneo do arquivo analítico no momento da criação do DataFrame.

Por outro lado, no caso da criação de uma tabela do Spark, os metadados do estado do arquivo analítico não são colocados em cache no Spark e são recarregados em cada execução de consulta SQL do Spark na tabela do Spark.

Assim sendo, pode escolher entre o carregamento para o DataFrame do Spark e a criação de uma tabela do Spark consoante queira que a análise do Spark seja avaliada relativamente a um instantâneo fixo do arquivo analítico ou ao instantâneo mais recente do arquivo analítico, respetivamente.

Se as suas consultas analíticas tiverem utilizado filtros frequentemente, tem a opção de particionar com base nestes campos para melhorar o desempenho das consultas. Pode executar periodicamente a tarefa de criação de partições a partir de um bloco de notas do Azure Synapse Spark para acionar a criação de partições no arquivo analítico. Este arquivo particionado aponta para a conta de armazenamento primária do ADLS Gen2 que está ligada à área de trabalho Azure Synapse. Para saber mais, veja a introdução à criação de partições personalizadas e como configurar artigos de criação de partições personalizados .

Nota

Para consultar contas do Azure Cosmos DB para MongoDB, saiba mais sobre a representação do esquema de fidelidade total no arquivo analítico e os nomes de propriedades expandidas a utilizar.

Nota

Tenha em atenção que todos os options comandos abaixo são sensíveis a maiúsculas e minúsculas. Por exemplo, tem de utilizar Gateway enquanto gateway irá devolver um erro.

Carregar para o DataFrame do Spark

Neste exemplo, irá criar um DataFrame do Spark que aponta para o arquivo analítico do Azure Cosmos DB. Em seguida, pode efetuar análises adicionais ao invocar ações do Spark no DataFrame. Esta operação não afeta o arquivo transacional.

A sintaxe em Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A sintaxe equivalente em Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Criar tabela do Spark

Neste exemplo, irá criar uma tabela do Spark que aponta o arquivo analítico do Azure Cosmos DB. Em seguida, pode efetuar análises adicionais ao invocar consultas SparkSQL na tabela. Esta operação não afeta o arquivo transacional nem incorre em movimentos de dados. Se decidir eliminar esta tabela do Spark, o contentor subjacente do Azure Cosmos DB e o arquivo analítico correspondente não serão afetados.

Este cenário é conveniente para reutilizar tabelas do Spark através de ferramentas de terceiros e fornecer acessibilidade aos dados subjacentes durante o tempo de execução.

A sintaxe para criar uma tabela do Spark é a seguinte:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Nota

Se tiver cenários em que o esquema do contentor do Azure Cosmos DB subjacente é alterado ao longo do tempo; e se quiser que o esquema atualizado reflita automaticamente nas consultas na tabela do Spark, pode fazê-lo ao definir a opção spark.cosmos.autoSchemaMerge como true nas opções de tabela do Spark.

Escrever o DataFrame do Spark no contentor do Azure Cosmos DB

Neste exemplo, vai escrever um DataFrame do Spark num contentor do Azure Cosmos DB. Esta operação afetará o desempenho das cargas de trabalho transacionais e consumirá unidades de pedido aprovisionadas no contentor do Azure Cosmos DB ou na base de dados partilhada.

A sintaxe em Python seria a seguinte:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

A sintaxe equivalente em Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Carregar o DataFrame de transmissão em fluxo a partir do contentor

Neste gesto, irá utilizar a capacidade de Transmissão em Fluxo do Spark para carregar dados de um contentor para um dataframe. Os dados serão armazenados na conta principal do data lake (e no sistema de ficheiros) que ligou à área de trabalho.

Nota

Se quiser referenciar bibliotecas externas no Synapse Apache Spark, saiba mais aqui. Por exemplo, se quiser ingerir um DataFrame do Spark num contentor do Azure Cosmos DB para MongoDB, pode utilizar o conector mongoDB para Spark.

Carregar o DataFrame de transmissão em fluxo a partir do contentor do Azure Cosmos DB

Neste exemplo, irá utilizar a capacidade de transmissão em fluxo estruturada do Spark para carregar dados de um contentor do Azure Cosmos DB para um DataFrame de transmissão em fluxo do Spark com a funcionalidade do feed de alterações no Azure Cosmos DB. Os dados de ponto de verificação utilizados pelo Spark serão armazenados na conta principal do data lake (e no sistema de ficheiros) que ligou à área de trabalho.

Se a pasta /localReadCheckpointFolder não for criada (no exemplo abaixo), será criada automaticamente. Esta operação afetará o desempenho das cargas de trabalho transacionais e consumirá Unidades de Pedido aprovisionadas no contentor do Azure Cosmos DB ou na base de dados partilhada.

A sintaxe em Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

A sintaxe equivalente em Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Escrever o DataFrame de transmissão em fluxo no contentor do Azure Cosmos DB

Neste exemplo, vai escrever um DataFrame de transmissão em fluxo num contentor do Azure Cosmos DB. Esta operação afetará o desempenho das cargas de trabalho transacionais e consumirá Unidades de Pedido aprovisionadas no contentor do Azure Cosmos DB ou na base de dados partilhada. Se a pasta /localWriteCheckpointFolder não for criada (no exemplo abaixo), será criada automaticamente.

A sintaxe em Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

A sintaxe equivalente em Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Passos seguintes