Поделиться через


Руководство. Подключение к Azure Cosmos DB для NoSQL с помощью Spark

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этом руководстве вы используете соединитель Spark Azure Cosmos DB для чтения или записи данных из учетной записи Azure Cosmos DB для NoSQL. В этом руководстве используется Azure Databricks и записная книжка Jupyter для иллюстрации интеграции с API для NoSQL из Spark. В этом руководстве основное внимание уделяется Python и Scala, хотя вы можете использовать любой язык или интерфейс, поддерживаемый Spark.

В этом руководстве описано следующее:

  • Подключитесь к учетной записи API для NoSQL с помощью Spark и записной книжки Jupyter.
  • Создать базу данных и ресурсы контейнера.
  • Прием данных в контейнер.
  • Запрос данных в контейнере.
  • Выполнение общих операций с элементами в контейнере.

Необходимые компоненты

Подключение с помощью Spark и Jupyter

Используйте существующую рабочую область Azure Databricks для создания вычислительного кластера, готового к использованию Apache Spark 3.4.x для подключения к учетной записи Azure Cosmos DB для NoSQL.

  1. Откройте рабочую область Azure Databricks.

  2. В интерфейсе рабочей области создайте новый кластер. Настройте кластер с этими параметрами как минимум:

    Версия Значение
    Версия среды выполнения 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Используйте интерфейс рабочей области для поиска пакетов Maven из Maven Central с идентификатором com.azure.cosmos.sparkгруппы. Установите пакет специально для Spark 3.4 с префиксом идентификатора артефакта, заданным в azure-cosmos-spark_3-4 кластере.

  4. Наконец, создайте новую записную книжку.

    Совет

    По умолчанию записная книжка подключена к недавно созданному кластеру.

  5. В записной книжке задайте параметры конфигурации оперативной обработки транзакций (OLTP) для конечной точки учетной записи NoSQL, имени базы данных и имени контейнера.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Создание базы данных и контейнера

Используйте API каталога для управления ресурсами учетной записи, такими как базы данных и контейнеры. Затем можно использовать OLTP для управления данными в ресурсах контейнера.

  1. Настройте API каталога для управления ресурсами API noSQL с помощью Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Создайте новую базу данных с именем cosmicworks с помощью CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Создайте новый контейнер с именем products с помощью CREATE TABLE IF NOT EXISTS. Убедитесь, что путь /category ключа секции задан и включена пропускная способность автомасштабирования с максимальной пропускной способностью 1000 единиц запросов (ЕЗ) в секунду.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Создайте другой контейнер с именем employees с помощью конфигурации ключа иерархической секции. Используйте /organizationи /department/team в качестве набора путей ключа секции. Следуйте этому конкретному заказу. Кроме того, задайте пропускную способность вручную 400 .

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Запустите ячейки записной книжки, чтобы убедиться, что база данных и контейнеры созданы в вашей учетной записи API для NoSQL.

Прием данных

Создание примера набора данных. Затем используйте OLTP для приема данных в контейнер API для NoSQL.

  1. Создание примера набора данных.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Используйте spark.createDataFrame и ранее сохраненную конфигурацию OLTP для добавления примеров данных в целевой контейнер.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Запрос данных

Загрузите данные OLTP в кадр данных для выполнения общих запросов к данным. Для фильтрации или запроса данных можно использовать различные синтаксисы.

  1. Используется spark.read для загрузки данных OLTP в объект кадра данных. Используйте ту же конфигурацию, которую вы использовали ранее в этом руководстве. Кроме того, установите флажок spark.cosmos.read.inferSchema.enabled , чтобы true разрешить соединителю Spark выводить схему путем выборки существующих элементов.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Отрисовка схемы данных, загруженных в кадр данных, с помощью printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Отрисовка строк данных, quantity в которых столбец меньше 20. where Используйте функции и show функции для выполнения этого запроса.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Отрисовка первой строки данных, clearance в которой находится trueстолбец. Используйте функцию filter для выполнения этого запроса.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Отрисовка пяти строк данных без фильтрации или усечения. Используйте функцию show для настройки внешнего вида и количества отображаемых строк.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Запросите данные с помощью этой необработанной строки запроса NoSQL: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Выполнение распространенных операций

При работе с данными API для NoSQL в Spark можно выполнять частичные обновления или работать с данными как необработанные JSON.

  1. Чтобы выполнить частичное обновление элемента:

    1. Скопируйте существующую config переменную конфигурации и измените свойства в новой копии. В частности, настройте стратегию ItemPatchзаписи в . Затем отключите массовую поддержку. Задайте столбцы и сопоставленные операции. Наконец, задайте для типа операции по умолчанию значение Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Создайте переменные для ключа секции элемента и уникальный идентификатор, предназначенный для выполнения этой операции исправления.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Создайте набор объектов исправлений, чтобы указать целевой элемент и указать поля, которые необходимо изменить.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Создайте кадр данных с помощью набора объектов исправлений. Используется write для выполнения операции исправления.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Запустите запрос, чтобы просмотреть результаты операции исправления. Теперь элемент должен быть назван Yamba New Surfboard без других изменений.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Для работы с необработанными данными JSON:

    1. Скопируйте существующую config переменную конфигурации и измените свойства в новой копии. В частности, измените целевой контейнер employeesна . Затем настройте contacts столбец или поле для использования необработанных данных JSON.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Создайте набор сотрудников для приема в контейнер.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Создайте кадр данных и используйте write для приема данных сотрудника.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Отрисовка данных из кадра данных с помощью show. Обратите внимание, что contacts столбец является необработанным JSON в выходных данных.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Следующий шаг