Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
PLATÍ PRO: NoSQL
V tomto kurzu použijete konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu dat z účtu Azure Cosmos DB for NoSQL. Tento kurz používá Azure Databricks a poznámkový blok Jupyter k ilustraci integrace s rozhraním API pro NoSQL ze Sparku. Tento kurz se zaměřuje na Python a Scala, i když můžete použít libovolný jazyk nebo rozhraní podporované Sparkem.
V tomto kurzu se naučíte:
- Připojte se k účtu API pro NoSQL pomocí Sparku a Jupyter notebooku.
- Vytvořte prostředky databáze a kontejneru.
- Příjem dat do kontejneru
- Dotazování dat v kontejneru
- Provádění běžných operací s položkami v kontejneru
Prerequisites
- Existující účet Azure Cosmos DB pro NoSQL.
- Pokud máte existující předplatné Azure, vytvořte nový účet.
- Existující pracovní prostor Azure Databricks.
Připojení pomocí Sparku a Jupyteru
Pomocí stávajícího pracovního prostoru Azure Databricks vytvořte výpočetní cluster připravený k použití Apache Sparku 3.4.x pro připojení k vašemu účtu Azure Cosmos DB for NoSQL.
Otevřete pracovní prostor Azure Databricks.
V rozhraní pracovního prostoru vytvořte nový cluster. Nakonfigurujte cluster s těmito nastaveními minimálně:
Version Value Verze běhového prostředí 13.3 LTS (Scala 2.12, Spark 3.4.1) Pomocí rozhraní pracovního prostoru vyhledejte balíčky Maven z Maven Central s identifikátorem skupiny
com.azure.cosmos.spark. Nainstalujte balíček speciálně pro Spark 3.4 s ID artefaktu předponouazure-cosmos-spark_3-4clusteru.Nakonec vytvořte nový poznámkový blok.
Tip
Ve výchozím nastavení je poznámkový blok připojený k nedávno vytvořenému klastru.
V poznámkovém bloku nastavte nastavení konfigurace online zpracování transakcí (OLTP) pro koncový bod účtu NoSQL, název databáze a název kontejneru.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Vytvoření databáze a kontejneru
Rozhraní API katalogu slouží ke správě prostředků účtu, jako jsou databáze a kontejnery. Pak můžete použít OLTP ke správě dat v rámci prostředků kontejneru.
Nakonfigurujte rozhraní API katalogu pro správu prostředků API pro NoSQL pomocí Sparku.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Vytvořte novou databázi s názvem
cosmicworkspomocí příkazuCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Vytvořte nový kontejner pojmenovaný
productspomocí .CREATE TABLE IF NOT EXISTSUjistěte se, že jste nastavili cestu/categoryk klíči oddílu a povolili propustnost automatického1000škálování s maximální propustností jednotek žádostí (RU) za sekundu.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Vytvořte další kontejner s názvem
employeespomocí konfigurace hierarchického klíče oddílu. Použijte/organization,/departmenta/teamjako sadu cest klíčů oddílu. Postupujte podle tohoto konkrétního pořadí. Také nastavte propustnost na ruční počet400RU.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Spusťte buňky poznámkového bloku, abyste ověřili, že vaše databáze a kontejnery byly vytvořeny ve vašem účtu služby API pro NoSQL.
Přijímání dat
Vytvořte ukázkovou datovou sadu. Pak pomocí OLTP ingestujte tato data do kontejneru API for NoSQL.
Vytvořte ukázkovou datovou sadu.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )K přidání ukázkových dat do cílového kontejneru použijte
spark.createDataFramea dříve uloženou konfiguraci OLTP.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Dotaz k datům
Načtěte data OLTP do datového rámce, abyste mohli provádět běžné dotazy na data. K filtrování nebo dotazování dat můžete použít různé syntaxe.
Slouží
spark.readk načtení dat OLTP do objektu datového rámce. Použijte stejnou konfiguraci, kterou jste použili dříve v tomto kurzu. Nastavte takéspark.cosmos.read.inferSchema.enablednatrue, aby konektor Spark mohl odvodit schéma vzorkováním existujících položek.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Vykreslení schématu dat načtených v datovém rámci pomocí
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Zobrazte řádky dat, kde je sloupec
quantitymenší než20. Použijte funkcewhereashowk provedení tohoto dotazu.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Vykreslí první řádek dat, kde je sloupec
clearancetrue.filterK provedení tohoto dotazu použijte funkci.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Vykreslí pět řádků dat bez filtru nebo zkrácení.
showPomocí funkce můžete přizpůsobit vzhled a počet vykreslených řádků.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Dotazování dat pomocí tohoto nezpracovaného řetězce dotazu NoSQL:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Provádění běžných operací
Při práci s daty API for NoSQL ve Sparku můžete provádět částečné aktualizace nebo pracovat s daty jako nezpracovaným kódem JSON.
Provedení částečné aktualizace položky:
Zkopírujte existující
configkonfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně nakonfigurujte strategii zápisu naItemPatch. Pak hromadnou podporu zakažte. Nastavte sloupce a mapované operace. Nakonec nastavte výchozí typ operace naSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Vytvořte proměnné pro klíč oddílu položky a jedinečný identifikátor, na který chcete cílit jako součást této operace opravy.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Vytvořte sadu objektů oprav, které určí cílovou položku a určí pole, která se mají upravit.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Vytvořte datový rámec pomocí sady objektů patch. Použijte
writek provedení operace opravy.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Spuštěním dotazu zkontrolujte výsledky operace opravy. Položka by teď měla být pojmenovaná
Yamba New Surfboardbez dalších změn.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Práce s nezpracovanými daty JSON:
Zkopírujte existující
configkonfigurační proměnnou a upravte vlastnosti v nové kopii. Konkrétně změňte cílový kontejner naemployees. Potom nakonfigurujtecontactssloupec nebo pole tak, aby používal nezpracovaná data JSON.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Vytvořte skupinu zaměstnanců pro nahrání do kontejneru.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Vytvořte datový rámec a použijte
writek importu dat zaměstnanců.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Vykreslení dat z datového rámce pomocí
show. Všimněte si, že ve výstupucontactsje nezpracovaný JSON.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Související obsah
- Apache Spark
- Rozhraní API katalogu Azure Cosmos DB
- Referenční informace o parametrech konfigurace
- Ukázky konektoru Spark pro Azure Cosmos DB
- Migrace ze Sparku 2.4 na Spark 3.*
- Zastaralé verze:
- Konektor Azure Cosmos DB Spark pro verze Spark 3.1 a 3.2 je zastaralý, protože v Azure Databricks, Azure Synapse ani Azure HDInsight už nejsou dostupné žádné moduly runtime pro Spark 3.1 nebo 3.2.
- Průvodce migrací pro aktualizaci ze Sparku 3.1
- Průvodce migrací pro aktualizaci ze Sparku 3.2
- Kompatibilita verzí:
- Poznámky k vydání
- Odkazy ke stažení: