Tutorial: Conectar-se ao Azure Cosmos DB for NoSQL usando o Spark
APLICA-SE A: NoSQL
Neste tutorial, você usará o conector do Azure Cosmos DB Spark para ler ou gravar dados de uma conta do Azure Cosmos DB for NoSQL. Este tutorial usa o Azure Databricks e um Jupyter notebook para ilustrar como se integrar à API para NoSQL por meio do Spark. Este tutorial tem como foco o Python e o Scala, mesmo que você possa usar qualquer linguagem ou interface com suporte do Spark.
Neste tutorial, você aprenderá a:
- Conectar-se a uma conta da API para NoSQL usando o Spark e um Jupyter notebook
- Criar recursos de banco de dados e contêiner
- Ingerir dados no contêiner
- Consultar dados no contêiner
- Executar operações comuns em itens no contêiner
Pré-requisitos
- Uma conta existente do Azure Cosmos DB for NoSQL.
- Se você tiver uma assinatura existente do Azure, crie uma nova conta.
- Nenhuma assinatura do Azure? Você pode experimentar o Azure Cosmos DB gratuitamente sem necessidade de cartão de crédito.
- Um workspace existente do Azure Databricks.
Conectar-se usando o Spark e o Jupyter
Use seu workspace existente do Azure Databricks para criar um cluster de cálculo pronto para usar o Apache Spark 3.4.x, a fim de se conectar à sua conta do Azure Cosmos DB for NoSQL.
Abra o workspace do Azure Databricks.
Na interface do workspace, crie um cluster. Defina o cluster com estas configurações, no mínimo:
Valor Versão de runtime 13.3 LTS (Scala 2.12, Spark 3.4.1) Use a interface do workspace para pesquisar pacotes do Maven no Repositório Central do Maven com a ID de Grupo
com.azure.cosmos.spark
. Instale o pacote específico do Spark 3.4 com uma ID de Artefato com o prefixoazure-cosmos-spark_3-4
no cluster.Por fim, crie um notebook.
Dica
Por padrão, o notebook será anexado ao cluster recém-criado.
No notebook, defina as configurações do OLTP para o ponto de extremidade da conta do NoSQL, o nome do banco de dados e o nome do contêiner.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Criar um banco de dados e um contêiner
Use a API do Catálogo para gerenciar recursos de conta, como bancos de dados e contêineres. Em seguida, use o OLTP para gerenciar os dados contidos nos recursos de contêiner.
Configure a API do Catálogo para gerenciar os recursos da API para NoSQL usando o Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Crie um novo banco de dados chamado
cosmicworks
usandoCREATE DATABASE IF NOT EXISTS
.# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Crie um contêiner chamado
products
usandoCREATE TABLE IF NOT EXISTS
. Lembre-se de definir o caminho da chave de partição como/category
e habilitar a taxa de transferência de dimensionamento automático com uma taxa de transferência máxima de1000
RU/s (unidades de solicitação por segundo).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Crie outro contêiner chamado
employees
usando uma configuração de chave de partição hierárquica com/organization
,/department
e/team
como o conjunto de caminhos de chave de partição nessa ordem específica. Além disso, defina a taxa de transferência como uma quantidade manual de400
RU/s# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Execute as células do notebook para validar se o banco de dados e os contêineres são criados em sua conta da API para NoSQL.
Ingerir dados
Crie um conjunto de dados de exemplo e use o OLTP para ingerir esses dados no contêiner da API para NoSQL.
Crie um conjunto de dados de exemplo.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Use
spark.createDataFrame
e a configuração do OLTP salva anteriormente para adicionar dados de exemplo ao contêiner de destino.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Consultar dados
Carregue os dados OLTP em um dataframe para executar consultas comuns nos dados. Você pode usar várias sintaxes filtrando ou consultando os dados.
Use
spark.read
para carregar os dados OLTP em um objeto de quadro de dados. Use a mesma configuração usada anteriormente neste tutorial. Além disso, definaspark.cosmos.read.inferSchema.enabled
como true para permitir que o conector do Spark deduza o esquema amostrando os itens existentes.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Renderize o esquema dos dados carregados no dataframe usando
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Renderize as linhas de dados em que a coluna
quantity
é menor que20
. Use as funçõeswhere
eshow
para executar essa consulta.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Renderize a primeira linha de dados em que a coluna
clearance
é true. Use a funçãofilter
para executar essa consulta.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Renderize cinco linhas de dados sem filtro ou truncamento. Use a função
show
para personalizar a aparência e o número de linhas renderizadas.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Consulte seus dados usando esta cadeia de consulta NoSQL bruta:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Executar operações comuns
Ao trabalhar com os dados da API para NoSQL no Spark, você pode fazer atualizações parciais ou trabalhar com os dados como um JSON bruto.
Para fazer uma atualização parcial de um item, realize estas etapas:
Copie a variável de configuração
config
existente e modifique as propriedades na nova cópia. Especificamente, configure a estratégia de gravação paraItemPatch
, desabilite o suporte em massa, defina as colunas e as operações mapeadas e, por fim, defina o tipo de operação padrão comoSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Crie variáveis para a chave de partição de item e o identificador exclusivo que você pretende ter como destino como parte desta operação de patch.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Crie um conjunto de objetos de patch para especificar o item de destino e os campos que devem ser modificados.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Crie um dataframe usando o conjunto de objetos de patch e use
write
para executar a operação de patch.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Execute uma consulta para analisar os resultados da operação de patch. O item agora será nomeado
Yamba New Surfboard
sem outras alterações.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Para trabalhar com os dados JSON brutos, realize estas etapas:
Copie a variável de configuração
config
existente e modifique as propriedades na nova cópia. Especificamente, altere o contêiner de destino paraemployees
e configure a coluna/o campocontacts
para usar dados JSON brutos.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Crie um conjunto de funcionários para ingerir no contêiner.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Crie um dataframe e use
write
para ingerir os dados do funcionário.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Renderize os dados do dataframe usando
show
. Observe que a colunacontacts
é um JSON bruto na saída.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Conteúdo relacionado
- Apache Spark
- API do Catálogo do Azure Cosmos DB
- Referência do parâmetro de configuração
- Exemplo de notebook “Dados de táxi da cidade de Nova York”
- Migrar do Spark 2.4 para o Spark 3.*
- Compatibilidade de versões
- Notas sobre a versão
- Links para download