Kurz: Připojení do služby Azure Cosmos DB for NoSQL pomocí Sparku
PLATÍ PRO: NoSQL
V tomto kurzu použijete konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu dat z účtu Azure Cosmos DB for NoSQL. Tento kurz používá Azure Databricks a poznámkový blok Jupyter k ilustraci integrace s rozhraním API pro NoSQL ze Sparku. Tento kurz se zaměřuje na Python a Scala, i když můžete použít libovolný jazyk nebo rozhraní podporované Sparkem.
V tomto kurzu se naučíte:
- Připojení k účtu ROZHRANÍ API pro NoSQL pomocí Sparku a poznámkového bloku Jupyter
- Vytvoření prostředků databáze a kontejneru
- Příjem dat do kontejneru
- Dotazování dat v kontejneru
- Provádění běžných operací s položkami v kontejneru
Požadavky
- Existující účet Azure Cosmos DB for NoSQL.
- Pokud máte existující předplatné Azure, vytvořte nový účet.
- Žádné předplatné Azure? Službu Azure Cosmos DB můžete vyzkoušet zdarma bez nutnosti platební karty.
- Existující pracovní prostor Azure Databricks.
Připojení s využitím Sparku a Jupyteru
Pomocí stávajícího pracovního prostoru Azure Databricks vytvořte výpočetní cluster připravený k použití Apache Sparku 3.4.x pro připojení k vašemu účtu Azure Cosmos DB for NoSQL.
Otevřete pracovní prostor Azure Databricks.
V rozhraní pracovního prostoru vytvořte nový cluster. Nakonfigurujte cluster s těmito nastaveními minimálně:
Hodnota Verze modulu runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Pomocí rozhraní pracovního prostoru vyhledejte balíčky Maven z Maven Central s ID
com.azure.cosmos.spark
skupiny . Nainstalujte balíček specifický pro Spark 3.4 s ID artefaktu sazure-cosmos-spark_3-4
předponou clusteru.Nakonec vytvořte nový poznámkový blok.
Tip
Ve výchozím nastavení se poznámkový blok připojí k nedávno vytvořenému clusteru.
V poznámkovém bloku nastavte nastavení konfigurace OLTP pro koncový bod účtu NoSQL, název databáze a název kontejneru.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Vytvoření databáze a kontejneru
Rozhraní API katalogu slouží ke správě prostředků účtu, jako jsou databáze a kontejnery. Pak můžete použít OLTP ke správě dat v rámci prostředků kontejneru.
Nakonfigurujte rozhraní API katalogu pro správu prostředků NoSQL pomocí Sparku.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Vytvořte novou databázi s názvem
cosmicworks
usingCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Vytvořte nový kontejner s názvem
products
usingCREATE TABLE IF NOT EXISTS
. Ujistěte se, že jste nastavili cestu/category
ke klíči oddílu a povolili propustnost automatického1000
škálování s maximální propustností jednotek žádostí za sekundu (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Vytvořte jiný kontejner pojmenovaný
employees
pomocí hierarchické konfigurace klíče oddílu s parametrem/organization
,/department
a/team
jako sadu cest ke klíči oddílu v daném konkrétním pořadí. Také nastavte propustnost na ruční množství400
RU/s.# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Spusťte buňku poznámkového bloku a ověřte, že se vaše databáze a kontejnery vytvářejí v rámci vašeho účtu ROZHRANÍ API pro NoSQL.
Ingestace dat
Vytvořte ukázkovou datovou sadu a pak pomocí OLTP ingestujte tato data do kontejneru API for NoSQL.
Vytvořte ukázkovou datovou sadu.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
K přidání ukázkových dat do cílového kontejneru použijte
spark.createDataFrame
dříve uloženou konfiguraci OLTP.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Zadávání dotazů na data
Načtěte data OLTP do datového rámce, abyste mohli provádět běžné dotazy na data. Můžete použít různé syntaxe filtrování nebo dotazování dat.
Slouží
spark.read
k načtení dat OLTP do objektu datového rámce. Použijte stejnou konfiguraci, jakou jste použili dříve v tomto kurzu. Pokud chcete konektor Sparku povolit odvození schématu vzorkováním existujících položek, nastavtespark.cosmos.read.inferSchema.enabled
na hodnotu true.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Vykreslení schématu dat načtených v datovém rámci pomocí
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Vykreslujte řádky dat, ve kterých
quantity
je sloupec menší než20
.where
K provedení tohoto dotazu použijte funkce ashow
funkce.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Vykreslí první řádek dat, ve
clearance
kterém je sloupec pravdivý.filter
K provedení tohoto dotazu použijte funkci.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Vykreslí pět řádků dat bez filtru nebo zkrácení.
show
Pomocí funkce můžete přizpůsobit vzhled a počet vykreslených řádků.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Dotazujte se na data pomocí tohoto nezpracovaného řetězce dotazu NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Provádění běžných operací
Při práci s daty API for NoSQL ve Sparku můžete provádět částečné aktualizace nebo pracovat s daty jako nezpracovaným json.
Pokud chcete provést částečnou aktualizaci položky, proveďte tyto kroky:
Zkopírujte existující
config
konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně; nakonfigurujte strategii zápisu naItemPatch
, zakažte hromadnou podporu, nastavte sloupce a mapované operace a nakonec nastavte výchozí typ operace naSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Vytvořte proměnné pro klíč oddílu položky a jedinečný identifikátor, na který chcete cílit jako součást této operace opravy.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Vytvořte sadu objektů oprav, které určí cílovou položku a určí pole, která se mají upravit.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Vytvořte datový rámec pomocí sady objektů oprav a použijte
write
k provedení operace opravy.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Spuštěním dotazu zkontrolujte výsledky operace opravy. Položka by teď měla být pojmenovaná
Yamba New Surfboard
bez dalších změn.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Pokud chcete pracovat s nezpracovanými daty JSON, postupujte takto:
Zkopírujte existující
config
konfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně; změňte cílový kontejner na sloupec nebo pole tak,employees
aby používal nezpracovaná data JSON a nakonfigurovali hocontacts
.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Vytvořte sadu zaměstnanců, kteří se mají ingestovat do kontejneru.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Vytvořte datový rámec a použijte
write
ingestování dat zaměstnanců.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Vykreslení dat z datového rámce pomocí
show
. Všimněte si, že ve výstupucontacts
je nezpracovaný JSON.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Související obsah
- Apache Spark
- Rozhraní API katalogu Služby Azure Cosmos DB
- Referenční informace o parametrech konfigurace
- Ukázkový poznámkový blok New York City Taxi
- Migrace ze Sparku 2.4 na Spark 3.*
- Kompatibilita verzí
- Poznámky
- Odkazy ke stažení