Взаимодействие с Azure Cosmos DB с помощью Apache Spark 2 в Azure Synapse Link

Примечание

Сведения о Azure Synapse Link для Azure Cosmos DB с использованием Spark 3 см. в этой статье Azure Synapse Link for Azure Cosmos DB on Spark 3

Из этой статьи вы узнаете, как взаимодействовать с Azure Cosmos DB с помощью Synapse Apache Spark 2. Благодаря полной поддержке Scala, Python, SparkSQL и C# Synapse Apache Spark является центральным элементом для аналитики, инжиниринга, обработки и анализа данных, а также для сценариев исследования данных в Azure Synapse Link для Azure Cosmos DB.

При взаимодействии с Azure Cosmos DB поддерживаются следующие возможности.

  • Synapse Apache Spark позволяет анализировать данные в контейнерах Azure Cosmos DB, которые поддерживают Azure Synapse Link практически в реальном времени, не влияя на производительность транзакционных рабочих нагрузок. Для запроса аналитического хранилища Azure Cosmos DB из Spark доступны следующие два варианта:
    • Загрузка в кадр данных Spark.
    • Создание таблицы Spark
  • Synapse Apache Spark также позволяет принимать данные в Azure Cosmos DB. Важно отметить, что данные всегда поступают в контейнеры Azure Cosmos DB через хранилище транзакций. Если включена поддержка Synapse Link, все новые операции вставки, обновления и удаления автоматически синхронизируются с аналитическим хранилищем.
  • Synapse Apache Spark также поддерживает структурированную потоковую передачу Spark в Azure Cosmos DB в качестве источника и приемника.

В следующих разделах излагается синтаксис функций, описанных выше. Вы также можете ознакомиться с модулем Learn о том, как запрашивать Azure Cosmos DB с помощью Apache Spark для Azure Synapse Analytics. Жесты в рабочей области Azure Synapse Analytics разработаны с целью предоставления простого готового интерфейса для начала работы. Чтобы просмотреть жесты, щелкните правой кнопкой мыши контейнер Azure Cosmos DB на вкладке Данные рабочей области Synapse. С помощью жестов можно быстро создать код и скорректировать его в соответствии с потребностями. Жесты также идеально подходят для обнаружения данных одним щелчком мыши.

Важно!

Вы должны учитывать некоторые ограничения в аналитической схеме, которые могут привести к непредвиденному поведению в операциях загрузки данных. Например, в аналитической схеме доступны только первые 1000 свойств из транзакционной схемы, свойства с пробелами недоступны и т. д. Если вы столкнулись с неожиданными результатами, проверьте ограничения схемы аналитического хранилища для получения дополнительных сведений.

Запрос аналитического хранилища Azure Cosmos DB

Прежде чем вы узнаете о двух возможных вариантах для запроса аналитического хранилища Azure Cosmos DB, загрузки в Spark DataFrame и создании таблицы Spark, следует ознакомиться с различиями в работе. Это позволит выбрать вариант, который лучше всего соответствует вашим потребностям.

Различия заключаются в том, должны ли изменения базовых данных в контейнере Azure Cosmos DB автоматически отражаться в анализе, выполненном в Spark. При регистрации кадра данных Spark или создании таблицы Spark для аналитического хранилища контейнера метаданные, связанные с текущим моментальным снимком данных в аналитическом хранилище, извлекаются в Spark. Это позволяет выполнить эффективное включение последующего анализа. Важно отметить, что так как Spark следует политике отложенной оценки, если только не вызвано действие в кадре данных Spark или не будет выполнен запрос SparkSQL в таблице Spark, фактические данные не будут получены из аналитического хранилища базового контейнера.

В случае загрузки в кадр данных Spark извлеченные метаданные кэшируются в течение времени существования сеанса Spark. Поэтому последующие действия, вызываемые в кадре данных, оцениваются по моментальному снимку аналитического хранилища на момент создания кадра данных.

С другой стороны, в случае создания таблицы Spark метаданные состояния аналитического хранилища не кэшируются в Spark и повторно загружаются при каждом выполнении запроса SparkSQL в таблице Spark.

Таким образом, вы можете выполнить загрузку в кадр данных Spark или создать таблицу Spark в зависимости от того, хотите ли вы оценить ваш анализ Spark по фиксированному снимку аналитического хранилища или по последнему снимку аналитического хранилища соответственно.

Если в аналитических запросах есть часто используемые фильтры, вы можете разделить их на основе этих полей, чтобы повысить производительность запроса. Вы можете периодически выполнять задание секционирования из записной книжки Spark Azure Synapse, чтобы активировать секционирование в аналитическом хранилище. Это секционированное хранилище указывает на основную учетную запись хранения ADLS 2-го поколения, связанную с рабочей областью Azure Synapse. Дополнительные сведения см. в статьях о настраиваемом секционировании и настройке настраиваемого секционирования.

Примечание

Чтобы запросить Учетные записи MongoDB в Azure Cosmos DB, ознакомьтесь с дополнительными сведениями о полном представлении схемы точности в аналитическом хранилище и именах расширенных свойств, которые необходимо использовать.

Примечание

Обратите внимание, что для всех options в приведенных ниже командах учитывается регистр. Например, необходимо использовать Gateway, потому что gateway вернет ошибку.

Загрузка в кадр данных Spark.

В этом примере вы создадите кадр данных Spark, который указывает на аналитическое хранилище Azure Cosmos DB. Затем можно выполнить дополнительный анализ, вызвав действия Spark для кадра данных. Эта операция не влияет на хранилище транзакций.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Создание таблицы Spark

В этом примере вы создадите таблицу Spark, которая указывает на аналитическое хранилище Azure Cosmos DB. Затем можно выполнить дополнительный анализ, вызвав запросы SparkSQL к таблице. Эта операция никак не влияет на хранилище транзакций и не влечет за собой перемещение данных. Если вы решили удалить эту таблицу Spark, это никак не повлияет на базовый контейнер Azure Cosmos DB и соответствующее аналитическое хранилище.

Этот сценарий удобен для повторного использования таблиц Spark с применением средств сторонних разработчиков и предоставления доступа к базовым данным в среде выполнения.

Синтаксис для создания таблицы Spark выглядит следующим образом:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Примечание

Если у вас есть сценарии, в которых схема базового контейнера Azure Cosmos DB изменяется с течением времени, и если вы хотите, чтобы обновленная схема автоматически отражалась в запросах к таблице Spark, в параметрах таблицы Spark установите для параметра spark.cosmos.autoSchemaMerge значение true.

Запись кадра данных Spark в контейнер Azure Cosmos DB

В этом примере вы запишете кадр данных Spark в контейнер Azure Cosmos DB. Эта операция влияет на производительность рабочих нагрузок транзакций и на использование единиц запросов, подготовленных в контейнере Azure Cosmos DB или общей базе данных.

Синтаксис на языке Python будет следующим:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Загрузка DataFrame для потоковой передачи из контейнера

В этом жесте вы будете использовать возможность потоковой передачи Spark для загрузки данных из контейнера в DataFrame. Данные будут храниться в основной учетной записи Data Lake (и файловой системе), подключенной к рабочей области.

Примечание

Если вы хотите ссылаться на внешние библиотеки в Synapse Apache Spark, просмотрите дополнительные сведения здесь. Например, если вы хотите принять кадр данных Spark в контейнер Azure Cosmos DB для MongoDB, можно использовать соединитель MongoDB для Spark.

Загрузка кадра данных для потоковой передачи из контейнера Azure Cosmos DB

В этом примере вы будете использовать функцию структурированной потоковой передачи Spark для загрузки данных из контейнера Azure Cosmos DB в кадр данных потоковой передачи Spark с помощью функции канала изменений в Azure Cosmos DB. Данные контрольных точек, используемые Spark, будут храниться в основной учетной записи озера данных (и файловой системе), подключенной к рабочей области.

Если папка /localReadCheckpointFolder отсутствует (в примере ниже), она будет создана автоматически. Эта операция влияет на производительность рабочих нагрузок транзакций и на использование единиц запросов, подготовленных в контейнере Azure Cosmos DB или общей базе данных.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Запись кадра данных для потоковой передачи в контейнер Azure Cosmos DB

В этом примере вы запишете кадр данных для потоковой передачи в контейнер Azure Cosmos DB. Эта операция влияет на производительность рабочих нагрузок транзакций и на использование единиц запросов, подготовленных в контейнере Azure Cosmos DB или общей базе данных. Если папка /localWriteCheckpointFolder отсутствует (в примере ниже), она будет создана автоматически.

Синтаксис на языке Python будет следующим:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Эквивалентный синтаксис на языке Scala будет следующим:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Следующие шаги