Eventos
Junte-se a nós na FabCon Vegas
31 de mar., 23 - 2 de abr., 23
O melhor evento liderado pela comunidade Microsoft Fabric, Power BI, SQL e AI. 31 de março a 2 de abril de 2025.
Registre-se hoje mesmoNão há mais suporte para esse navegador.
Atualize o Microsoft Edge para aproveitar os recursos, o suporte técnico e as atualizações de segurança mais recentes.
Este tutorial mostra como manipular eventos em uma conta de armazenamento que tem um namespace hierárquico.
Você criará uma pequena solução que permite que um usuário preencha uma tabela do Databricks Delta carregando um arquivo CSV (de valores separados por vírgula) que descreve uma ordem de venda. Você criará essa solução conectando uma assinatura de Grade de Eventos, uma função do Azure e um trabalho no Azure Databricks.
Neste tutorial, você irá:
Criaremos essa solução em ordem inversa, começando com o workspace do Azure Databricks.
Criar uma conta de armazenamento que tenha um namespace hierárquico (Azure Data Lake Storage). Este tutorial usa uma conta de armazenamento nomeada contosoorders
.
Confira Criar uma conta de armazenamento para uso com o Azure Data Lake Storage.
Verifique se a sua conta de usuário tem a função Colaborador de Dados do Storage Blob atribuída a ela.
Crie uma entidade de serviço, crie um segredo do cliente e conceda à entidade de serviço acesso à conta de armazenamento.
Confira Tutorial: Conectar-se ao Azure Data Lake Storage (etapas 1 a 3). Depois de concluir essas etapas, cole a ID do locatário, a ID do aplicativo e os valores do segredo do cliente em um arquivo de texto. Você precisará deles em breve.
Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
Primeiro, crie um arquivo CSV que descreva uma ordem de venda e, em seguida, carregue esse arquivo na conta de armazenamento. Posteriormente, você usará os dados desse arquivo para preencher a primeira linha em nossa tabela do Databricks Delta.
Navegue até sua nova conta de armazenamento no portal do Azure.
Selecione Navegador de armazenamento –>Contêineres de blob –>Adicionar contêiner e crie um contêiner chamado dados.
No contêiner dados, crie um diretório chamado entrada.
Em um editor de texto, cole o texto a seguir.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Salve esse arquivo em seu computador local e dê a ele o nome data.csv.
No navegador de armazenamento, carregue esse arquivo na pasta entrada.
Nesta seção, você executará estas tarefas:
Nesta seção, você deve cria um workspace do Azure Databricks usando o Portal do Azure.
Criar um workspace do Azure Databricks. Nomeie esse espaço de trabalho como contoso-orders
. Consulte Criar um workspace do Azure Databricks.
Criar um cluster. Nomeie o cluster customer-order-cluster
. Consulte Criar um cluster.
Crie um notebook. Nomeie o notebook configure-customer-table
e escolha Python como a linguagem padrão do notebook. Confira Criar um notebook.
No notebook criado, copie e cole o bloco de código a seguir na primeira célula, mas não execute esse código ainda.
Substitua os valores de espaço reservado appId
, password
e tenant
neste código pelos valores coletados ao concluir os pré-requisitos deste tutorial.
dbutils.widgets.text('source_file', "", "Source File")
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
inputPath = adlsPath + dbutils.widgets.get('source_file')
customerTablePath = adlsPath + 'delta-tables/customers'
Esse código cria um widget chamado source_file. Posteriormente, você criará uma função do Azure que chama esse código e passa um caminho de arquivo para esse widget. Esse código também autentica a entidade de serviço com a conta de armazenamento e cria algumas variáveis que você usará em outras células.
Observação
Em uma configuração de produção, considere armazenar sua chave de autenticação no Azure Databricks. Em seguida, adicione uma chave de pesquisa ao bloco de código em vez da chave de autenticação.
Por exemplo, em vez de usar esta linha de código spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
:, você usaria a seguinte linha de código: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
Depois de concluir este tutorial, confira o artigo Azure Data Lake Storage no site Azure Databricks para ver exemplos dessa abordagem.
Pressione as teclas SHIFT+ENTER para executar o código nesse bloco.
Copie e cole o bloco de código a seguir em uma célula diferente e pressione as teclas SHIFT + ENTER para executar o código nesse bloco.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
inputSchema = StructType([
StructField("InvoiceNo", IntegerType(), True),
StructField("StockCode", StringType(), True),
StructField("Description", StringType(), True),
StructField("Quantity", IntegerType(), True),
StructField("InvoiceDate", StringType(), True),
StructField("UnitPrice", DoubleType(), True),
StructField("CustomerID", IntegerType(), True),
StructField("Country", StringType(), True)
])
rawDataDF = (spark.read
.option("header", "true")
.schema(inputSchema)
.csv(adlsPath + 'input')
)
(rawDataDF.write
.mode("overwrite")
.format("delta")
.saveAsTable("customer_data", path=customerTablePath))
Esse código cria a tabela do Databricks Delta na conta de armazenamento e, em seguida, carrega alguns dados iniciais do arquivo CSV que você carregou anteriormente.
Depois que esse bloco de código for executado com êxito, remova-o do notebook.
Copie e cole o bloco de código a seguir em uma célula diferente, mas não execute essa célula.
upsertDataDF = (spark
.read
.option("header", "true")
.csv(inputPath)
)
upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Esse código insere dados em uma exibição de tabela temporária usando dados de um arquivo CSV. O caminho para esse arquivo CSV é proveniente do widget de entrada que você criou em uma etapa anterior.
Copie e cole o seguinte bloco de código em uma célula diferente. Esse código mescla o conteúdo da exibição de tabela temporária com a tabela do Databricks Delta.
%sql
MERGE INTO customer_data cd
USING customer_data_to_upsert cu
ON cd.CustomerID = cu.CustomerID
WHEN MATCHED THEN
UPDATE SET
cd.StockCode = cu.StockCode,
cd.Description = cu.Description,
cd.InvoiceNo = cu.InvoiceNo,
cd.Quantity = cu.Quantity,
cd.InvoiceDate = cu.InvoiceDate,
cd.UnitPrice = cu.UnitPrice,
cd.Country = cu.Country
WHEN NOT MATCHED
THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
VALUES (
cu.InvoiceNo,
cu.StockCode,
cu.Description,
cu.Quantity,
cu.InvoiceDate,
cu.UnitPrice,
cu.CustomerID,
cu.Country)
Crie um trabalho que execute o notebook que você criou anteriormente. Posteriormente, você criará uma função do Azure que executará esse trabalho quando um evento for gerado.
Selecione Novo->Trabalho.
Dê um nome ao trabalho, escolha o notebook que você criou e o cluster. Em seguida, selecione Criar para criar o trabalho.
Crie uma função do Azure que executa o trabalho.
Em seu espaço de trabalho do Azure Databricks, clique no nome de usuário na barra superior e, em seguida, na lista suspensa selecione Configurações do Usuário.
Na guia Tokens de acessoselecione Gerar novo token.
Copie o token que aparece e, em seguida, clique em Concluído.
No canto superior do workspace do Databricks, escolha o ícone de pessoas e, em seguida, escolha Configurações do usuário.
Selecione o botão Gerar novo token e escolha o botão Gerar.
Verifique se você copiou o token para um local seguro. Para que a função do Azure possa executar o trabalho, ela precisa desse token para autenticar com o Databricks.
No menu do portal do Azure ou na Página inicial, selecione Criar um recurso.
Na página Novo, selecione Computação>Aplicativo de Funções.
Na guia Noções básicas da página Criar aplicativo de funções, escolha um grupo de recursos e altere ou verifique as seguintes configurações:
Configuração | Valor |
---|---|
Nome do aplicativo de funções | contosoorder |
Pilha de runtime | .NET |
Publicação | Código |
Sistema operacional | Windows |
Tipo de plano | Consumo (sem servidor) |
Selecione Examinar + Criar e, em seguida, selecione Criar.
Quando a implantação for concluída, selecione Ir para o recurso para abrir a página de visão geral do aplicativo de funções.
No grupo Configurações, selecione Configuração.
Na página Configurações do Aplicativo, escolha o botão Nova configuração de aplicativo para adicionar cada configuração.
Adicione as seguintes configurações:
Nome da configuração | Valor |
---|---|
DBX_INSTANCE | A região do workspace do Databricks. Por exemplo: westus2.azuredatabricks.net |
DBX_PAT | O token de acesso pessoal que você gerou anteriormente. |
DBX_JOB_ID | O identificador do trabalho em execução. |
Selecione Salvar para confirmar essas configurações.
No grupo Funções, selecione Functions e, em seguida, selecione Criar.
Escolha Gatilho de Grade de Eventos do Azure.
Instale a extensão Microsoft.Azure.WebJobs.Extensions.EventGrid se for solicitado que você faça isso. Se precisar instalá-la, você precisará escolher o Gatilho de Grade de Eventos do Azure novamente para criar a função.
O painel Nova Função é exibido.
No painel Nova Função, nomeie a função UpsertOrder e escolha o botão Criar.
Substitua o conteúdo do arquivo de código por este código e selecione o botão Salvar:
#r "Azure.Messaging.EventGrid"
#r "System.Memory.Data"
#r "Newtonsoft.Json"
#r "System.Text.Json"
using Azure.Messaging.EventGrid;
using Azure.Messaging.EventGrid.SystemEvents;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
private static HttpClient httpClient = new HttpClient();
public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
{
log.LogInformation("Event Subject: " + eventGridEvent.Subject);
log.LogInformation("Event Topic: " + eventGridEvent.Topic);
log.LogInformation("Event Type: " + eventGridEvent.EventType);
log.LogInformation(eventGridEvent.Data.ToString());
if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
if (fileData.Api == "FlushWithClose") {
log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
var fileUrl = new Uri(fileData.Url);
var httpRequestMessage = new HttpRequestMessage {
Method = HttpMethod.Post,
RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
Headers = {
{ System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
{ System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
},
Content = new StringContent(JsonConvert.SerializeObject(new {
job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
notebook_params = new {
source_file = String.Join("", fileUrl.Segments.Skip(2))
}
}))
};
var response = await httpClient.SendAsync(httpRequestMessage);
response.EnsureSuccessStatusCode();
}
}
}
Esse código analisa informações sobre o evento de armazenamento que foi gerado e, em seguida, cria uma mensagem de solicitação com a URL do arquivo que disparou o evento. Como parte da mensagem, a função passa um valor para o widget source_file que você criou anteriormente. O código de função envia a mensagem para o trabalho do Databricks e usa o token que você obteve anteriormente como autenticação.
Nesta seção, você criará uma assinatura da Grade de Eventos que chama a função do Azure quando os arquivos são carregados na conta de armazenamento.
Selecione Integração e, na página Integração, selecione Gatilho da Grade de Eventos.
No painel Editar gatilho, nomeie o evento eventGridEvent
e selecione Criar assinatura de evento.
Observação
O nome eventGridEvent
corresponde ao parâmetro nomeado que é passado para a função do Azure.
Na guia Noções básicas da página Criar assinatura de evento, altere ou verifique as seguintes configurações:
Configuração | Valor |
---|---|
Nome | contoso-order-event-subscription |
Tipo de tópico | Conta de armazenamento |
Recurso de Origem | contosoorders |
Nome do tópico do sistema | <create any name> |
Filtro para Tipos de Evento | Blob criado e Blob excluído |
Selecione o botão Criar.
Crie um arquivo chamado customer-order.csv
, cole as informações a seguir nesse arquivo e salve-o no computador local.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
Em Gerenciador de Armazenamento, carregue esse arquivo na pasta input da conta de armazenamento.
O upload de um arquivo gera o evento Microsoft.Storage.BlobCreated. A Grade de Eventos notifica todos os assinantes desse evento. Em nosso caso, a função do Azure é a única assinante. A função do Azure analisa os parâmetros do evento para determinar qual evento ocorreu. Em seguida, ela passa a URL do arquivo para o trabalho do Databricks. O trabalho do databricks lê o arquivo e adiciona uma linha à tabela do Databricks Delta que está localizada na conta de armazenamento.
Para verificar se o trabalho foi bem-sucedido, exiba as execuções do trabalho. Você verá um status de conclusão. Para obter mais informações sobre como exibir execuções de um trabalho, confira Exibir execuções para um trabalho
Em uma nova célula da pasta de trabalho, execute esta consulta em uma célula para ver a tabela delta atualizada.
%sql select * from customer_data
A tabela retornada mostra o registro mais recente.
Para atualizar esse registro, crie um arquivo chamado customer-order-update.csv
, cole as informações a seguir nesse arquivo e salve-o no computador local.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Esse arquivo csv é quase idêntico ao anterior, exceto que a quantidade do pedido é alterada de 228
para 22
.
Em Gerenciador de Armazenamento, carregue esse arquivo na pasta input da conta de armazenamento.
Execute a consulta select
novamente para ver a tabela delta atualizada.
%sql select * from customer_data
A tabela retornada mostra o registro atualizado.
Quando não forem mais necessários, exclua o grupo de recursos e todos os recursos relacionados. Para fazer isso, selecione o grupo de recursos da conta de armazenamento e selecione Excluir.
Eventos
Junte-se a nós na FabCon Vegas
31 de mar., 23 - 2 de abr., 23
O melhor evento liderado pela comunidade Microsoft Fabric, Power BI, SQL e AI. 31 de março a 2 de abril de 2025.
Registre-se hoje mesmo