Udostępnij za pośrednictwem


Samouczek: nawiązywanie połączenia z usługą Azure Cosmos DB for NoSQL przy użyciu platformy Spark

DOTYCZY: NoSQL

W tym samouczku użyjesz łącznika Spark usługi Azure Cosmos DB do odczytywania lub zapisywania danych z konta usługi Azure Cosmos DB for NoSQL. W tym samouczku użyto usługi Azure Databricks i notesu Jupyter, aby zilustrować sposób integracji z interfejsem API for NoSQL z platformy Spark. Ten samouczek koncentruje się na językach Python i Scala, chociaż można używać dowolnego języka lub interfejsu obsługiwanego przez platformę Spark.

Z tego samouczka dowiesz się, jak wykonywać następujące czynności:

  • Nawiąż połączenie z kontem interfejsu API dla noSQL przy użyciu platformy Spark i notesu Jupyter.
  • Utwórz zasoby bazy danych i kontenera.
  • Pozyskiwanie danych do kontenera.
  • Wykonywanie zapytań o dane w kontenerze.
  • Wykonywanie typowych operacji na elementach w kontenerze.

Wymagania wstępne

Nawiązywanie połączenia przy użyciu platformy Spark i programu Jupyter

Użyj istniejącego obszaru roboczego usługi Azure Databricks, aby utworzyć klaster obliczeniowy gotowy do użycia platformy Apache Spark 3.4.x w celu nawiązania połączenia z kontem usługi Azure Cosmos DB for NoSQL.

  1. Otwórz obszar roboczy usługi Azure Databricks.

  2. W interfejsie obszaru roboczego utwórz nowy klaster. Skonfiguruj klaster przy użyciu tych ustawień co najmniej:

    Wersja Wartość
    Wersja środowiska uruchomieniowego 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Użyj interfejsu obszaru roboczego, aby wyszukać pakiety Maven z usługi Maven Central przy użyciu identyfikatora com.azure.cosmos.sparkgrupy . Zainstaluj pakiet specjalnie dla platformy Spark 3.4 z prefiksem Artifact ID z azure-cosmos-spark_3-4 klastrem.

  4. Na koniec utwórz nowy notes.

    Napiwek

    Domyślnie notes jest dołączony do ostatnio utworzonego klastra.

  5. W notesie ustaw ustawienia konfiguracji przetwarzania transakcji online (OLTP) dla punktu końcowego konta NoSQL, nazwy bazy danych i nazwy kontenera.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Tworzenie bazy danych i kontenera

Interfejs API wykazu umożliwia zarządzanie zasobami kont, takimi jak bazy danych i kontenery. Następnie można użyć olTP do zarządzania danymi w zasobach kontenera.

  1. Skonfiguruj interfejs API wykazu do zarządzania interfejsem API dla zasobów NoSQL przy użyciu platformy Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Utwórz nową bazę danych o nazwie cosmicworks przy użyciu polecenia CREATE DATABASE IF NOT EXISTS.

    # Create a database by using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database by using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Utwórz nowy kontener o nazwie products przy użyciu polecenia CREATE TABLE IF NOT EXISTS. Upewnij się, że ustawiono ścieżkę klucza partycji na /category wartość i włączono przepływność skalowania automatycznego z maksymalną przepływnością 1000 jednostek żądań (RU) na sekundę.

    # Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Utwórz inny kontener o nazwie employees przy użyciu hierarchicznej konfiguracji klucza partycji. Użyj parametrów /organization, /departmenti /team jako zestawu ścieżek klucza partycji. Postępuj zgodnie z tym konkretnym zamówieniem. Ponadto ustaw przepływność na ręczną ilość 400 jednostek RU.

    # Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container by using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Uruchom komórki notesu, aby sprawdzić, czy baza danych i kontenery zostały utworzone w ramach konta interfejsu API dla noSQL.

Pozyskiwanie danych

Utwórz przykładowy zestaw danych. Następnie użyj olTP, aby pozyskać te dane do interfejsu API dla kontenera NoSQL.

  1. Utwórz przykładowy zestaw danych.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Użyj i spark.createDataFrame wcześniej zapisanej konfiguracji OLTP, aby dodać przykładowe dane do kontenera docelowego.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Zapytania o dane

Ładowanie danych OLTP do ramki danych w celu wykonywania typowych zapytań dotyczących danych. Do filtrowania lub wykonywania zapytań dotyczących danych można użyć różnych składni.

  1. Służy spark.read do ładowania danych OLTP do obiektu ramki danych. Użyj tej samej konfiguracji, która była używana wcześniej w tym samouczku. Ponadto ustawiono wartość spark.cosmos.read.inferSchema.enabled , aby true umożliwić łącznikowi Spark wnioskowanie schematu przez próbkowanie istniejących elementów.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Renderowanie schematu danych załadowanych w ramce danych przy użyciu polecenia printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Renderuj wiersze danych, w których kolumna quantity jest mniejsza niż 20. where Użyj funkcji ishow, aby wykonać to zapytanie.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Renderuj pierwszy wiersz danych, w którym kolumna clearance to true. filter Użyj funkcji , aby wykonać to zapytanie.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Renderuj pięć wierszy danych bez filtru ani obcinania. show Użyj funkcji , aby dostosować wygląd i liczbę renderowanych wierszy.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Wykonaj zapytanie o dane przy użyciu tego nieprzetworzonego ciągu zapytania NoSQL: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Wykonywanie typowych operacji

Podczas pracy z danymi interfejsu API for NoSQL na platformie Spark można wykonywać częściowe aktualizacje lub pracować z danymi jako nieprzetworzone dane JSON.

  1. Aby wykonać częściową aktualizację elementu:

    1. Skopiuj istniejącą config zmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. W szczególności skonfiguruj strategię zapisu na .ItemPatch Następnie wyłącz obsługę zbiorczą. Ustaw kolumny i zamapowane operacje. Na koniec ustaw domyślny typ operacji na Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Utwórz zmienne dla klucza partycji elementu i unikatowy identyfikator, który ma być przeznaczony jako część tej operacji poprawki.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Utwórz zestaw obiektów poprawek, aby określić element docelowy i określić pola, które mają zostać zmodyfikowane.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Utwórz ramkę danych przy użyciu zestawu obiektów poprawek. Użyj polecenia write , aby wykonać operację stosowania poprawek.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Uruchom zapytanie, aby przejrzeć wyniki operacji stosowania poprawki. Element powinien być teraz nazwany Yamba New Surfboard bez innych zmian.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. Aby pracować z nieprzetworzonymi danymi JSON:

    1. Skopiuj istniejącą config zmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. W szczególności zmień kontener docelowy na employees. Następnie skonfiguruj kolumnę contacts /pole, aby używać nieprzetworzonych danych JSON.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Utwórz zestaw pracowników do pozyskiwania do kontenera.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Utwórz ramkę danych i użyj jej write do pozyskiwania danych pracowników.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Renderuj dane z ramki danych przy użyciu polecenia show. Zwróć uwagę, że kolumna contacts jest nieprzetworzonym kodem JSON w danych wyjściowych.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Następny krok