Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
SI APPLICA A: NoSQL
In questa esercitazione si usa il connettore Spark di Azure Cosmos DB per leggere o scrivere dati da un account Azure Cosmos DB per NoSQL. Questa esercitazione usa Azure Databricks e un notebook Jupyter per illustrare come eseguire l'integrazione con l'API per NoSQL da Spark. Questa esercitazione è incentrata su Python e Scala, anche se è possibile usare qualsiasi linguaggio o interfaccia supportata da Spark.
In questa esercitazione verranno illustrate le procedure per:
- Connettersi a un account API per un account NoSQL usando un notebook di Jupyter e Spark.
- Creare risorse di database e contenitori.
- Inserire dati nel contenitore.
- Eseguire query sui dati nel contenitore.
- Eseguire operazioni comuni sugli elementi nel contenitore.
Prerequisites
- Un account Azure Cosmos DB for NoSQL già presente.
- Se si ha già una sottoscrizione di Azure, creare un nuovo account.
- Un'area di lavoro di Azure Databricks esistente.
Connettersi con Spark e Jupyter
Usare l'area di lavoro di Azure Databricks esistente per creare un cluster di calcolo pronto per usare Apache Spark 3.4.x per connettersi all'account Azure Cosmos DB per NoSQL.
Aprire l'area di lavoro di Azure Databricks.
Nell'interfaccia dell'area di lavoro creare un nuovo cluster. Configurare il cluster con queste impostazioni, almeno:
Version Value Versione del runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Usare l'interfaccia dell'area di lavoro per cercare i pacchetti Maven da Maven Central con un ID gruppo di
com.azure.cosmos.spark. Installare il pacchetto specifico per Spark 3.4 con un ID artefatto preceduto dal prefissoazure-cosmos-spark_3-4nel cluster.Infine, creare un nuovo notebook.
Tip
Per impostazione predefinita, il notebook è collegato al cluster creato di recente.
Nel notebook, impostare le impostazioni di configurazione OLTP (Online Transaction Processing) per l'endpoint dell'account NoSQL, il nome del database e il nome del contenitore.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Creare un database e un contenitore
Usare l'API catalogo per gestire le risorse dell'account, ad esempio database e contenitori. È quindi possibile usare OLTP per gestire i dati all'interno delle risorse del contenitore.
Configurare l'API catalogo per gestire le risorse API per NoSQL usando Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Creare un nuovo database denominato
cosmicworksusandoCREATE DATABASE IF NOT EXISTS.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Creare un nuovo contenitore denominato
productsusandoCREATE TABLE IF NOT EXISTS. Assicurarsi di impostare il percorso della chiave di partizione su/categorye abilitare la velocità effettiva con scalabilità automatica con una velocità effettiva massima di1000unità richiesta (UR) al secondo.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Creare un altro contenitore denominato
employeesusando una configurazione gerarchica della chiave di partizione. Usare/organization,/departmente/teamcome set di percorsi di chiave di partizione. Seguire l'ordine specifico. Impostare anche la velocità effettiva su un numero manuale di400UR.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Eseguire le celle del notebook per assicurarsi che il database e i contenitori vengano creati nell'API per l'account NoSQL.
Inserire dati
Creare un set di dati di esempio. Usare quindi OLTP per inserire tali dati nel contenitore API per NoSQL.
Creare un set di dati di esempio.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Usare
spark.createDataFramee la configurazione OLTP salvata in precedenza per aggiungere dati di esempio al contenitore di destinazione.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Dati di query
Caricare dati OLTP in un frame di dati per eseguire query comuni sui dati. È possibile usare varie sintassi per filtrare o eseguire query sui dati.
Usare
spark.readper caricare i dati OLTP in un oggetto frame di dati. Usare la stessa configurazione usata in precedenza in questa esercitazione. Inoltre, impostarespark.cosmos.read.inferSchema.enabledsutrueper consentire al connettore Spark di dedurre lo schema eseguendo il campionamento degli elementi esistenti.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Eseguire il rendering dello schema dei dati caricati nel frame di dati usando
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Eseguire il rendering delle righe di dati in cui la colonna
quantityè minore di20. Usare le funzioniwhereeshowper eseguire questa query.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Eseguire il rendering della prima riga di dati in cui la colonna
clearanceètrue. Usare la funzionefilterper eseguire questa query.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Eseguire il rendering di cinque righe di dati senza filtro o troncamento. Usare la funzione
showper personalizzare l'aspetto e il numero di righe di cui viene eseguito il rendering.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Eseguire query sui dati usando questa stringa di query NoSQL non elaborata:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Eseguire operazioni comuni
Quando si usa l'API per i dati NoSQL in Spark, è possibile eseguire aggiornamenti parziali o usare i dati come JSON non elaborati.
Per eseguire un aggiornamento parziale di un elemento:
Copiare la variabile di configurazione esistente
confige modificare le proprietà nella nuova copia. In particolare, configurare la strategia di scrittura suItemPatch. Disabilitare quindi il supporto in blocco. Impostare le colonne e le operazioni mappate. Impostare infine il tipo di operazione predefinito suSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Creare variabili per la chiave di partizione dell'elemento e l'identificatore univoco di destinazione come parte di questa operazione di patch.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Creare un set di oggetti patch per specificare l'elemento di destinazione e specificare i campi da modificare.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Creare un frame di dati usando il set di oggetti patch. Usare
writeper eseguire l'operazione di patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Eseguire una query per esaminare i risultati dell'operazione di patch. L'elemento dovrebbe ora essere denominato
Yamba New Surfboardsenza altre modifiche.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Per usare dati JSON non elaborati:
Copiare la variabile di configurazione esistente
confige modificare le proprietà nella nuova copia. In particolare, modificare il contenitore di destinazione inemployees. Configurare quindi la colonna/campocontactsper l'uso di dati JSON non elaborati.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Creare un set di dipendenti da inserire nel contenitore.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Creare un frame di dati e usare
writeper inserire i dati dei dipendenti.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Eseguire il rendering dei dati dal frame di dati usando
show. Osservare che la colonnacontactsè un JSON non elaborato nell'output.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenuti correlati
- Apache Spark
- API catalogo di Azure Cosmos DB
- Riferimento ai parametri di configurazione
- Esempi di connettori Spark per Azure Cosmos DB
- Eseguire la migrazione da Spark 2.4 a Spark 3.*
- Versioni deprecate:
- Il connettore Spark di Azure Cosmos DB per Spark 3.1 e 3.2 è deprecato perché non sono più disponibili runtime Spark 3.1 o 3.2 supportati in Azure Databricks, Azure Synapse o Azure HDInsight.
- Guida alla migrazione per l'aggiornamento da Spark 3.1
- Guida alla migrazione per l'aggiornamento da Spark 3.2
- Compatibilità tra versioni:
- Note sulla versione:
- Collegamenti per il download: