Samouczek: Połączenie do usługi Azure Cosmos DB for NoSQL przy użyciu platformy Spark
DOTYCZY: NoSQL
W tym samouczku użyjesz łącznika Spark usługi Azure Cosmos DB do odczytywania lub zapisywania danych z konta usługi Azure Cosmos DB for NoSQL. W tym samouczku użyto usługi Azure Databricks i notesu Jupyter, aby zilustrować sposób integracji z interfejsem API for NoSQL z platformy Spark. Ten samouczek koncentruje się na językach Python i Scala, mimo że można używać dowolnego języka lub interfejsu obsługiwanego przez platformę Spark.
Z tego samouczka dowiesz się, jak wykonywać następujące czynności:
- Połączenie do interfejsu API dla konta NoSQL przy użyciu platformy Spark i notesu Jupyter
- Tworzenie zasobów bazy danych i kontenera
- Pozyskiwanie danych do kontenera
- Wykonywanie zapytań dotyczących danych w kontenerze
- Wykonywanie typowych operacji na elementach w kontenerze
Wymagania wstępne
- Istniejące konto usługi Azure Cosmos DB for NoSQL.
- Jeśli masz istniejącą subskrypcję platformy Azure, utwórz nowe konto.
- Brak subskrypcji platformy Azure? Możesz wypróbować usługę Azure Cosmos DB bezpłatnie bez konieczności korzystania z karty kredytowej.
- Istniejący obszar roboczy usługi Azure Databricks.
Połączenie przy użyciu platform Spark i jupyter
Użyj istniejącego obszaru roboczego usługi Azure Databricks, aby utworzyć klaster obliczeniowy gotowy do użycia platformy Apache Spark 3.4.x w celu nawiązania połączenia z kontem usługi Azure Cosmos DB for NoSQL.
Otwórz obszar roboczy usługi Azure Databricks.
W interfejsie obszaru roboczego utwórz nowy klaster. Skonfiguruj klaster przy użyciu tych ustawień co najmniej:
Wartość Wersja środowiska uruchomieniowego 13.3 LTS (Scala 2.12, Spark 3.4.1) Użyj interfejsu obszaru roboczego, aby wyszukać pakiety Maven z narzędzia Maven Central przy użyciu identyfikatora grupy .
com.azure.cosmos.spark
Zainstaluj pakiet specyficzny dla platformy Spark 3.4 z prefiksem Identyfikator artefaktu zazure-cosmos-spark_3-4
klastrem.Na koniec utwórz nowy notes.
Napiwek
Domyślnie notes zostanie dołączony do ostatnio utworzonego klastra.
W notesie ustaw ustawienia konfiguracji OLTP dla punktu końcowego konta NoSQL, nazwy bazy danych i nazwy kontenera.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Tworzenie bazy danych i kontenera
Interfejs API wykazu umożliwia zarządzanie zasobami kont, takimi jak bazy danych i kontenery. Następnie można użyć olTP do zarządzania danymi w ramach zasobu kontenera[s].
Skonfiguruj interfejs API wykazu do zarządzania interfejsem API dla zasobów NoSQL przy użyciu platformy Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Utwórz nową bazę danych o nazwie
cosmicworks
przy użyciu poleceniaCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Utwórz nowy kontener o nazwie
products
przy użyciu poleceniaCREATE TABLE IF NOT EXISTS
. Upewnij się, że ustawiono ścieżkę klucza partycji na/category
wartość i włączono przepływność skalowania automatycznego z maksymalną przepływnością1000
jednostek żądań na sekundę (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Utwórz inny kontener o nazwie
employees
przy użyciu hierarchii konfiguracji klucza partycji z elementami/organization
,/department
i/team
jako zestaw ścieżek klucza partycji w tej określonej kolejności. Ponadto ustaw przepływność na ręczną ilość400
jednostek RU/s# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Uruchom komórkę notesu[s], aby sprawdzić, czy baza danych i kontenery zostały utworzone w ramach konta interfejsu API dla noSQL.
Pozyskiwanie danych
Utwórz przykładowy zestaw danych, a następnie użyj usługi OLTP, aby pozyskać te dane do kontenera interfejsu API for NoSQL.
Utwórz przykładowy zestaw danych.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Użyj i
spark.createDataFrame
wcześniej zapisanej konfiguracji OLTP, aby dodać przykładowe dane do kontenera docelowego.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Zapytania o dane
Ładowanie danych OLTP do ramki danych w celu wykonywania typowych zapytań dotyczących danych. Możesz użyć różnych filtrów składni lub wykonywania zapytań dotyczących danych.
Służy
spark.read
do ładowania danych OLTP do obiektu ramki danych. Użyj tej samej konfiguracji, która była używana wcześniej w tym samouczku. Ponadto ustawspark.cosmos.read.inferSchema.enabled
wartość true, aby umożliwić łącznikowi Spark wnioskowanie schematu przez próbkowanie istniejących elementów.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Renderowanie schematu danych załadowanych w ramce danych przy użyciu polecenia
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Renderuj wiersze danych, w których kolumna
quantity
jest mniejsza niż20
.where
Użyj funkcji ishow
, aby wykonać to zapytanie.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Renderuj pierwszy wiersz danych, w którym kolumna
clearance
ma wartość true.filter
Użyj funkcji , aby wykonać to zapytanie.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Renderuj pięć wierszy danych bez filtru ani obcinania.
show
Użyj funkcji , aby dostosować wygląd i liczbę renderowanych wierszy.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Wykonaj zapytanie dotyczące danych przy użyciu tego nieprzetworzonego ciągu zapytania NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Wykonywanie typowych operacji
Podczas pracy z danymi interfejsu API for NoSQL na platformie Spark można wykonywać częściowe aktualizacje lub pracować z danymi jako nieprzetworzone dane JSON.
Aby wykonać częściową aktualizację elementu, wykonaj następujące kroki:
Skopiuj istniejącą
config
zmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. Specjalnie; Skonfiguruj strategię zapisu naItemPatch
wartość , wyłącz obsługę zbiorczą, ustaw kolumny i operacje mapowane, a na koniec ustaw domyślny typ operacji naSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Utwórz zmienne dla klucza partycji elementu i unikatowy identyfikator, który ma być przeznaczony jako część tej operacji poprawki.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Utwórz zestaw obiektów poprawek, aby określić element docelowy i określić pola, które mają zostać zmodyfikowane.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Utwórz ramkę danych przy użyciu zestawu obiektów poprawek i użyj polecenia
write
, aby wykonać operację stosowania poprawek.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Uruchom zapytanie, aby przejrzeć wyniki operacji stosowania poprawki. Element powinien być teraz nazwany
Yamba New Surfboard
bez innych zmian.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Aby pracować z nieprzetworzonymi danymi JSON, wykonaj następujące kroki:
Skopiuj istniejącą
config
zmienną konfiguracji i zmodyfikuj właściwości w nowej kopii. Specjalnie; zmień kontener docelowy naemployees
i skonfiguruj kolumnęcontacts
/pole, aby używać nieprzetworzonych danych JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Utwórz zestaw pracowników do pozyskiwania do kontenera.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Utwórz ramkę danych i użyj jej
write
do pozyskiwania danych pracowników.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Renderuj dane z ramki danych przy użyciu polecenia
show
. Zwróć uwagę, że kolumnacontacts
jest nieprzetworzonym kodem JSON w danych wyjściowych.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Powiązana zawartość
- Apache Spark
- Interfejs API wykazu usługi Azure Cosmos DB
- Dokumentacja parametrów konfiguracji
- Przykładowy notes "New York City Taxi data"
- Migrowanie z platformy Spark 2.4 do platformy Spark 3.*
- Zgodność wersji
- Wersji
- Pobieranie linków