Руководство. Подключение к Azure Cosmos DB для NoSQL с помощью Spark
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
В этом руководстве вы используете соединитель Spark Azure Cosmos DB для чтения или записи данных из учетной записи Azure Cosmos DB для NoSQL. В этом руководстве используется Azure Databricks и записная книжка Jupyter для иллюстрации интеграции с API для NoSQL из Spark. В этом руководстве основное внимание уделяется Python и Scala, хотя вы можете использовать любой язык или интерфейс, поддерживаемый Spark.
В этом руководстве описано следующее:
- Подключитесь к учетной записи API для NoSQL с помощью Spark и записной книжки Jupyter.
- Создать базу данных и ресурсы контейнера.
- Прием данных в контейнер.
- Запрос данных в контейнере.
- Выполнение общих операций с элементами в контейнере.
Необходимые компоненты
- Существующая учетная запись Azure Cosmos DB для NoSQL.
- Если у вас есть подписка Azure, создайте новую учетную запись.
- Нет подписки Azure? Вы можете попробовать Azure Cosmos DB бесплатно без кредитной карты.
- Существующая рабочая область Azure Databricks.
Подключение с помощью Spark и Jupyter
Используйте существующую рабочую область Azure Databricks для создания вычислительного кластера, готового к использованию Apache Spark 3.4.x для подключения к учетной записи Azure Cosmos DB для NoSQL.
Откройте рабочую область Azure Databricks.
В интерфейсе рабочей области создайте новый кластер. Настройте кластер с этими параметрами как минимум:
Версия Значение Версия среды выполнения 13.3 LTS (Scala 2.12, Spark 3.4.1) Используйте интерфейс рабочей области для поиска пакетов Maven из Maven Central с идентификатором
com.azure.cosmos.spark
группы. Установите пакет специально для Spark 3.4 с префиксом идентификатора артефакта, заданным вazure-cosmos-spark_3-4
кластере.Наконец, создайте новую записную книжку.
Совет
По умолчанию записная книжка подключена к недавно созданному кластеру.
В записной книжке задайте параметры конфигурации оперативной обработки транзакций (OLTP) для конечной точки учетной записи NoSQL, имени базы данных и имени контейнера.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Создание базы данных и контейнера
Используйте API каталога для управления ресурсами учетной записи, такими как базы данных и контейнеры. Затем можно использовать OLTP для управления данными в ресурсах контейнера.
Настройте API каталога для управления ресурсами API noSQL с помощью Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Создайте новую базу данных с именем
cosmicworks
с помощьюCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Создайте новый контейнер с именем
products
с помощьюCREATE TABLE IF NOT EXISTS
. Убедитесь, что путь/category
ключа секции задан и включена пропускная способность автомасштабирования с максимальной пропускной способностью1000
единиц запросов (ЕЗ) в секунду.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Создайте другой контейнер с именем
employees
с помощью конфигурации ключа иерархической секции. Используйте/organization
и/department
/team
в качестве набора путей ключа секции. Следуйте этому конкретному заказу. Кроме того, задайте пропускную способность вручную400
.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Запустите ячейки записной книжки, чтобы убедиться, что база данных и контейнеры созданы в вашей учетной записи API для NoSQL.
Прием данных
Создание примера набора данных. Затем используйте OLTP для приема данных в контейнер API для NoSQL.
Создание примера набора данных.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Используйте
spark.createDataFrame
и ранее сохраненную конфигурацию OLTP для добавления примеров данных в целевой контейнер.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Запрос данных
Загрузите данные OLTP в кадр данных для выполнения общих запросов к данным. Для фильтрации или запроса данных можно использовать различные синтаксисы.
Используется
spark.read
для загрузки данных OLTP в объект кадра данных. Используйте ту же конфигурацию, которую вы использовали ранее в этом руководстве. Кроме того, установите флажокspark.cosmos.read.inferSchema.enabled
, чтобыtrue
разрешить соединителю Spark выводить схему путем выборки существующих элементов.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Отрисовка схемы данных, загруженных в кадр данных, с помощью
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Отрисовка строк данных,
quantity
в которых столбец меньше20
.where
Используйте функции иshow
функции для выполнения этого запроса.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Отрисовка первой строки данных,
clearance
в которой находитсяtrue
столбец. Используйте функциюfilter
для выполнения этого запроса.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Отрисовка пяти строк данных без фильтрации или усечения. Используйте функцию
show
для настройки внешнего вида и количества отображаемых строк.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Запросите данные с помощью этой необработанной строки запроса NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Выполнение распространенных операций
При работе с данными API для NoSQL в Spark можно выполнять частичные обновления или работать с данными как необработанные JSON.
Чтобы выполнить частичное обновление элемента:
Скопируйте существующую
config
переменную конфигурации и измените свойства в новой копии. В частности, настройте стратегиюItemPatch
записи в . Затем отключите массовую поддержку. Задайте столбцы и сопоставленные операции. Наконец, задайте для типа операции по умолчанию значениеSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Создайте переменные для ключа секции элемента и уникальный идентификатор, предназначенный для выполнения этой операции исправления.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Создайте набор объектов исправлений, чтобы указать целевой элемент и указать поля, которые необходимо изменить.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Создайте кадр данных с помощью набора объектов исправлений. Используется
write
для выполнения операции исправления.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Запустите запрос, чтобы просмотреть результаты операции исправления. Теперь элемент должен быть назван
Yamba New Surfboard
без других изменений.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Для работы с необработанными данными JSON:
Скопируйте существующую
config
переменную конфигурации и измените свойства в новой копии. В частности, измените целевой контейнерemployees
на . Затем настройтеcontacts
столбец или поле для использования необработанных данных JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Создайте набор сотрудников для приема в контейнер.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Создайте кадр данных и используйте
write
для приема данных сотрудника.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Отрисовка данных из кадра данных с помощью
show
. Обратите внимание, чтоcontacts
столбец является необработанным JSON в выходных данных.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Связанный контент
- Apache Spark
- API каталога Azure Cosmos DB
- Справочник по параметру конфигурации
- Примеры соединителя Spark для Azure Cosmos DB
- Миграция из Spark 2.4 в Spark 3.*
- Совместимость версий:
- Заметки о выпуске:
- Ссылки на скачивание: