Esercitazione: Connessione ad Azure Cosmos DB per NoSQL con Spark
SI APPLICA A: NoSQL
In questa esercitazione si usa il connettore Spark di Azure Cosmos DB per leggere o scrivere dati da un account Azure Cosmos DB per NoSQL. Questa esercitazione usa Azure Databricks e un notebook jupyter per illustrare come eseguire l'integrazione con l'API per NoSQL da Spark. Questa esercitazione è incentrata su Python e Scala anche se è possibile usare qualsiasi linguaggio o interfaccia supportata da Spark.
In questa esercitazione apprenderai a:
- Connessione a un account API per NoSQL usando Spark e un notebook di Jupyter
- Creare risorse di database e contenitori
- Inserire dati nel contenitore
- Eseguire query sui dati nel contenitore
- Eseguire operazioni comuni sugli elementi nel contenitore
Prerequisiti
- Un account Azure Cosmos DB per NoSQL esistente.
- Se si ha una sottoscrizione di Azure esistente, creare un nuovo account.
- Nessuna sottoscrizione di Azure? È possibile provare Gratuitamente Azure Cosmos DB senza richiedere carta di credito.
- Un'area di lavoro di Azure Databricks esistente.
Connessione con Spark e Jupyter
Usare l'area di lavoro di Azure Databricks esistente per creare un cluster di calcolo pronto per usare Apache Spark 3.4.x per connettersi all'account Azure Cosmos DB per NoSQL.
Aprire l'area di lavoro di Azure Databricks.
Nell'interfaccia dell'area di lavoro creare un nuovo cluster. Configurare il cluster con queste impostazioni, almeno:
valore Versione di runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Usare l'interfaccia dell'area di lavoro per cercare i pacchetti Maven da Maven Central con un ID gruppo di
com.azure.cosmos.spark
. Installare il pacchetto specifico per Spark 3.4 con un ID artefatto precedutoazure-cosmos-spark_3-4
dal prefisso nel cluster.Infine, creare un nuovo notebook.
Suggerimento
Per impostazione predefinita, il notebook verrà collegato al cluster creato di recente.
Nel notebook impostare le impostazioni di configurazione OLTP per l'endpoint dell'account NoSQL, il nome del database e il nome del contenitore.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Creare un database e un contenitore
Usare l'API catalogo per gestire le risorse dell'account, ad esempio database e contenitori. È quindi possibile usare OLTP per gestire i dati all'interno della risorsa contenitore[s].
Configurare l'API catalogo per gestire l'API per le risorse NoSQL usando Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Creare un nuovo database denominato
cosmicworks
usandoCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Creare un nuovo contenitore denominato
products
usandoCREATE TABLE IF NOT EXISTS
. Assicurarsi di impostare il percorso della chiave di partizione su/category
e abilitare la velocità effettiva con scalabilità automatica con una velocità effettiva massima di1000
unità richiesta al secondo (UR/sec).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Creare un altro contenitore denominato
employees
usando una configurazione gerarchica della chiave di partizione con/organization
,/department
e/team
come set di percorsi di chiave di partizione in tale ordine specifico. Impostare anche la velocità effettiva su una quantità manuale di400
UR/sec# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Eseguire la cella del notebook [s] per verificare che il database e i contenitori vengano creati nell'account API per NoSQL.
Inserire i dati
Creare un set di dati di esempio e quindi usare OLTP per inserire tali dati nel contenitore API per NoSQL.
Creare un set di dati di esempio.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Usare
spark.createDataFrame
e la configurazione OLTP salvata in precedenza per aggiungere dati di esempio al contenitore di destinazione.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Eseguire query sui dati
Caricare dati OLTP in un frame di dati per eseguire query comuni sui dati. È possibile usare varie sintassi per filtrare o eseguire query sui dati.
Usare
spark.read
per caricare i dati OLTP in un oggetto dataframe. Usare la stessa configurazione usata in precedenza in questa esercitazione. Impostare anche suspark.cosmos.read.inferSchema.enabled
true per consentire al connettore Spark di dedurre lo schema eseguendo il campionamento degli elementi esistenti.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Eseguire il rendering dello schema dei dati caricati nel dataframe usando
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Eseguire il rendering delle righe di dati in cui la
quantity
colonna è minore di20
. Usare lewhere
funzioni eshow
per eseguire questa query.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Eseguire il rendering della prima riga di dati in cui la
clearance
colonna è true. Usare lafilter
funzione per eseguire questa query.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Eseguire il rendering di cinque righe di dati senza filtro o troncamento. Usare la
show
funzione per personalizzare l'aspetto e il numero di righe di cui viene eseguito il rendering.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Eseguire query sui dati usando questa stringa di query NoSQL non elaborata:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Eseguire operazioni comuni
Quando si usa l'API per i dati NoSQL in Spark, è possibile eseguire aggiornamenti parziali o usare i dati come JSON non elaborati.
Per eseguire un aggiornamento parziale di un elemento, seguire questa procedura:
Copiare la variabile di configurazione esistente
config
e modificare le proprietà nella nuova copia. Specificamente; configurare la strategia di scrittura suItemPatch
, disabilitare il supporto bulk, impostare le colonne e le operazioni mappate e infine impostare il tipo di operazione predefinito suSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Creare variabili per la chiave di partizione dell'elemento e l'identificatore univoco di destinazione come parte di questa operazione di patch.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Creare un set di oggetti patch per specificare l'elemento di destinazione e specificare i campi da modificare.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Creare un frame di dati usando il set di oggetti patch e usare
write
per eseguire l'operazione di patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Eseguire una query per esaminare i risultati dell'operazione di patch. L'elemento dovrebbe ora essere denominato
Yamba New Surfboard
senza altre modifiche.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Per usare dati JSON non elaborati, seguire questa procedura:
Copiare la variabile di configurazione esistente
config
e modificare le proprietà nella nuova copia. Specificamente; modificare il contenitore di destinazione inemployees
e configurare lacontacts
colonna/campo in modo da usare dati JSON non elaborati.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Creare un set di dipendenti da inserire nel contenitore.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Creare un frame di dati e usare
write
per inserire i dati dei dipendenti.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Eseguire il rendering dei dati dal frame di dati usando
show
. Osservare che lacontacts
colonna è JSON non elaborato nell'output.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Contenuto correlato
- Apache Spark
- API catalogo di Azure Cosmos DB
- Riferimento ai parametri di configurazione
- Notebook di esempio "Dati dei taxi di New York City"
- Eseguire la migrazione da Spark 2.4 a Spark 3.*
- Compatibilità delle versioni
- Note sulla versione
- Scaricare i collegamenti