Interagir com o Azure Cosmos DB com o Apache Spark 3 no Azure Synapse Link

Neste artigo, irá aprender a interagir com o Azure Cosmos DB com o Synapse Apache Spark 3. Com o suporte total do Scala, Python, SparkSQL e C#, o Synapse Apache Spark 3 é fundamental para cenários de análise, engenharia de dados, ciência de dados e exploração de dados no Azure Synapse Link para o Azure Cosmos DB.

As seguintes capacidades são suportadas ao interagir com o Azure Cosmos DB:

  • O Synapse Apache Spark 3 permite-lhe analisar dados nos contentores do Azure Cosmos DB que estão ativados com o Azure Synapse Link quase em tempo real sem afetar o desempenho das cargas de trabalho transacionais. As duas opções seguintes estão disponíveis para consultar o arquivo analítico do Azure Cosmos DB a partir do Spark:
    • Carregar para o DataFrame do Spark
    • Criar tabela do Spark
  • O Synapse Apache Spark também lhe permite ingerir dados no Azure Cosmos DB. É importante ter em atenção que os dados são sempre ingeridos em contentores do Azure Cosmos DB através do arquivo transacional. Quando Synapse Link estiver ativada, todas as novas inserções, atualizações e eliminações são sincronizadas automaticamente com o arquivo analítico.
  • O Synapse Apache Spark também suporta a transmissão em fluxo estruturada do Spark com o Azure Cosmos DB como uma origem, bem como um sink.

As secções seguintes explicam-lhe a sintaxe das capacidades acima. Também pode ver o módulo do Learn sobre como Consultar o Azure Cosmos DB com o Apache Spark para Azure Synapse Analytics. Os gestos na área de trabalho do Azure Synapse Analytics foram concebidos para proporcionar uma experiência fácil de começar. Os gestos são visíveis quando clica com o botão direito do rato num contentor do Azure Cosmos DB no separador Dados da área de trabalho do Synapse. Com os gestos, pode gerar rapidamente código e personalizá-lo de acordo com as suas necessidades. Os gestos também são perfeitos para detetar dados com um único clique.

Importante

Deve estar ciente de algumas restrições no esquema analítico que podem levar ao comportamento inesperado nas operações de carregamento de dados. Por exemplo, apenas as primeiras 1000 propriedades do esquema transacional estão disponíveis no esquema analítico, as propriedades com espaços não estão disponíveis, etc. Se estiver a ter alguns resultados inesperados, verifique as restrições de esquema do arquivo analítico para obter mais detalhes.

Consultar o arquivo analítico do Azure Cosmos DB

Antes de saber mais sobre as duas opções possíveis para consultar o arquivo analítico do Azure Cosmos DB, carregar para o DataFrame do Spark e criar uma tabela do Spark, vale a pena explorar as diferenças de experiência para que possa escolher a opção que funciona para as suas necessidades.

A diferença de experiência é saber se as alterações de dados subjacentes no contentor do Azure Cosmos DB devem ser refletidas automaticamente na análise efetuada no Spark. Quando um DataFrame do Spark é registado ou uma tabela do Spark é criada no arquivo analítico de um contentor, os metadados em torno do instantâneo atual dos dados no arquivo analítico são obtidos para o Spark para uma redução eficiente da análise subsequente. É importante ter em atenção que, uma vez que o Spark segue uma política de avaliação preguiçosa, a menos que seja invocada uma ação no DataFrame do Spark ou uma consulta SparkSQL seja executada na tabela do Spark, os dados reais não são obtidos a partir do arquivo analítico do contentor subjacente.

No caso do carregamento para o DataFrame do Spark, os metadados obtidos são colocados em cache durante a duração da sessão do Spark e, portanto, as ações subsequentes invocadas no DataFrame são avaliadas relativamente ao instantâneo do arquivo analítico no momento da criação do DataFrame.

Por outro lado, no caso da criação de uma tabela do Spark, os metadados do estado do arquivo analítico não são colocados em cache no Spark e são recarregados em cada execução de consulta SQL do Spark na tabela do Spark.

Assim sendo, pode escolher entre o carregamento para o DataFrame do Spark e a criação de uma tabela do Spark consoante queira que a análise do Spark seja avaliada relativamente a um instantâneo fixo do arquivo analítico ou ao instantâneo mais recente do arquivo analítico, respetivamente.

Nota

Para consultar contas do Azure Cosmos DB para MongoDB, saiba mais sobre a representação de esquema de fidelidade total no arquivo analítico e os nomes de propriedades expandidos a utilizar.

Nota

Tenha em atenção que todos os options comandos abaixo são sensíveis às maiúsculas e minúsculas.

Carregar para o DataFrame do Spark

Neste exemplo, irá criar um DataFrame do Spark que aponta para o arquivo analítico do Azure Cosmos DB. Em seguida, pode efetuar análises adicionais ao invocar ações do Spark no DataFrame. Esta operação não afeta o arquivo transacional.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Criar tabela do Spark

Neste exemplo, irá criar uma tabela do Spark que aponta para o arquivo analítico do Azure Cosmos DB. Em seguida, pode efetuar análises adicionais ao invocar consultas SparkSQL na tabela. Esta operação não afeta o arquivo transacional nem incorre em qualquer movimento de dados. Se decidir eliminar esta tabela do Spark, o contentor subjacente do Azure Cosmos DB e o arquivo analítico correspondente não serão afetados.

Este cenário é conveniente para reutilizar tabelas do Spark através de ferramentas de terceiros e fornecer acessibilidade aos dados subjacentes para o tempo de execução.

A sintaxe para criar uma tabela do Spark é a seguinte:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Nota

Se tiver cenários em que o esquema do contentor subjacente do Azure Cosmos DB é alterado ao longo do tempo; e se quiser que o esquema atualizado reflita automaticamente nas consultas na tabela do Spark, pode fazê-lo ao definir a opção spark.cosmos.autoSchemaMerge para true nas opções da tabela do Spark.

Escrever DataFrame do Spark no contentor do Azure Cosmos DB

Neste exemplo, vai escrever um DataFrame do Spark num contentor do Azure Cosmos DB. Esta operação afetará o desempenho das cargas de trabalho transacionais e consumirá unidades de pedido aprovisionadas no contentor do Azure Cosmos DB ou na base de dados partilhada.

A sintaxe no Python seria a seguinte:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Carregar o DataFrame de transmissão a partir do contentor

Neste gesto, irá utilizar a capacidade de Transmissão em Fluxo do Spark para carregar dados de um contentor para um dataframe. Os dados serão armazenados na conta principal do data lake (e sistema de ficheiros) que ligou à área de trabalho.

Nota

Se quiser fazer referência a bibliotecas externas no Synapse Apache Spark, saiba mais aqui. Por exemplo, se quiser ingerir um DataFrame do Spark num contentor do Azure Cosmos DB para MongoDB, pode tirar partido do conector mongoDB para o Spark aqui.

Carregar o DataFrame a partir do contentor do Azure Cosmos DB

Neste exemplo, irá utilizar a capacidade de transmissão em fluxo estruturada do Spark para carregar dados de um contentor do Azure Cosmos DB para um DataFrame de transmissão em fluxo do Spark com a funcionalidade de feed de alterações no Azure Cosmos DB. Os dados de ponto de verificação utilizados pelo Spark serão armazenados na conta principal do data lake (e sistema de ficheiros) que ligou à área de trabalho.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Escrever o DataFrame de transmissão em fluxo no contentor do Azure Cosmos DB

Neste exemplo, vai escrever um DataFrame de transmissão em fluxo num contentor do Azure Cosmos DB. Esta operação afetará o desempenho das cargas de trabalho transacionais e consumirá Unidades de Pedido aprovisionadas no contentor do Azure Cosmos DB ou na base de dados partilhada. Se a pasta /localWriteCheckpointFolder não for criada (no exemplo abaixo), será criada automaticamente.

A sintaxe no Python seria a seguinte:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

A sintaxe equivalente no Scala seria a seguinte:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Passos seguintes