Tutorial: Implementar o padrão de captura do data lake para atualizar uma tabela do Databricks Delta
Este tutorial mostra-lhe como lidar com eventos numa conta de armazenamento que tem um espaço de nomes hierárquico.
Irá criar uma pequena solução que permite a um utilizador preencher uma tabela do Databricks Delta ao carregar um ficheiro de valores separados por vírgulas (csv) que descreve uma encomenda de vendas. Irá criar esta solução ao ligar uma subscrição do Event Grid, uma Função do Azure e uma Tarefa no Azure Databricks.
Neste tutorial, vai:
- Crie uma subscrição do Event Grid que chame uma Função do Azure.
- Crie uma Função do Azure que receba uma notificação de um evento e, em seguida, execute a tarefa no Azure Databricks.
- Crie uma tarefa do Databricks que insira uma encomenda de cliente numa tabela do Databricks Delta localizada na conta de armazenamento.
Vamos criar esta solução por ordem inversa, a começar pela área de trabalho do Azure Databricks.
Pré-requisitos
Crie uma conta de armazenamento que tenha um espaço de nomes hierárquico (Azure Data Lake Storage Gen2). Este tutorial utiliza uma conta de armazenamento com o nome
contosoorders
.Veja Criar uma conta de armazenamento para utilizar com Azure Data Lake Storage Gen2.
Certifique-se de que a sua conta de utilizador tem a função Contribuidor de Dados de Blobs de Armazenamento atribuída à mesma.
Crie um principal de serviço, crie um segredo de cliente e, em seguida, conceda ao principal de serviço acesso à conta de armazenamento.
Veja Tutorial: Ligar ao Azure Data Lake Storage Gen2 (Passos 1 a 3). Depois de concluir estes passos, certifique-se de que cola o ID do inquilino, o ID da aplicação e os valores do segredo do cliente num ficheiro de texto. Vai precisar deles em breve.
Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
Criar uma encomenda de vendas
Primeiro, crie um ficheiro csv que descreva uma encomenda de vendas e, em seguida, carregue esse ficheiro para a conta de armazenamento. Mais tarde, irá utilizar os dados deste ficheiro para preencher a primeira linha na nossa tabela Databricks Delta.
No portal do Azure, navegue para a sua nova conta de armazenamento.
Selecione Contentores de armazenamento browser-Blob> ->Adicionar contentor e crie um novo contentor com o nome dados.
No contentor de dados , crie um diretório com o nome input.
Cole o seguinte texto num editor de texto.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Guarde este ficheiro no seu computador local e atribua-lhe o nome data.csv.
No browser de armazenamento, carregue este ficheiro para a pasta de entrada .
Criar uma tarefa no Azure Databricks
Nesta secção, irá realizar estas tarefas:
- Crie uma área de trabalho do Azure Databricks.
- Crie um bloco de notas.
- Crie e preencha uma tabela do Databricks Delta.
- Adicione código que insere linhas na tabela Delta do Databricks.
- Criar uma Tarefa.
Criar uma área de trabalho do Azure Databricks
Nesta secção, vai criar uma área de trabalho do Azure Databricks com o portal do Azure.
Crie uma área de trabalho do Azure Databricks. Atribua um nome a essa área de
contoso-orders
trabalho . Veja Criar uma área de trabalho do Azure Databricks.Criar um cluster. Dê um nome ao cluster
customer-order-cluster
. Veja Criar um cluster.Crie um bloco de notas. Atribua um nome ao bloco de notas
configure-customer-table
e selecione Python como o idioma predefinido do bloco de notas. Veja Criar um bloco de notas.
Criar e preencher uma tabela do Databricks Delta
No bloco de notas que criou, copie e cole o seguinte bloco de código na primeira célula, mas ainda não execute este código.
Substitua os
appId
valores de marcador de posição ,password
tenant
, neste bloco de código pelos valores que recolheu ao concluir os pré-requisitos deste tutorial.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Este código cria um widget com o nome source_file. Mais tarde, irá criar uma Função do Azure que chama este código e passa um caminho de ficheiro para esse widget. Este código também autentica o principal de serviço com a conta de armazenamento e cria algumas variáveis que irá utilizar noutras células.
Nota
Numa definição de produção, considere armazenar a chave de autenticação no Azure Databricks. Em seguida, adicione uma chave de pesquisa ao seu bloco de código em vez da chave de autenticação.
Por exemplo, em vez de utilizar esta linha de código:spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
, utilizaria a seguinte linha de código:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
Depois de concluir este tutorial, veja o artigo Azure Data Lake Storage Gen2 no Site do Azure Databricks para ver exemplos desta abordagem.Prima as teclas SHIFT + ENTER para executar o código neste bloco.
Copie e cole o seguinte bloco de código numa célula diferente e, em seguida, prima as teclas SHIFT + ENTER para executar o código neste bloco.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Este código cria a tabela Delta do Databricks na sua conta de armazenamento e, em seguida, carrega alguns dados iniciais do ficheiro csv que carregou anteriormente.
Depois de este bloco de código ser executado com êxito, remova este bloco de código do seu bloco de notas.
Adicionar código que insere linhas na tabela Do Databricks Delta
Copie e cole o seguinte bloco de código numa célula diferente, mas não execute esta célula.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Este código insere dados numa vista de tabela temporária com dados de um ficheiro csv. O caminho para esse ficheiro csv provém do widget de entrada que criou num passo anterior.
Copie e cole o seguinte bloco de código numa célula diferente. Este código intercala o conteúdo da vista de tabela temporária com a tabela Databricks Delta.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
Create a Job (Crie uma Tarefa)
Crie uma Tarefa que execute o bloco de notas que criou anteriormente. Mais tarde, irá criar uma Função do Azure que executa esta tarefa quando um evento é gerado.
Selecione Nova>Tarefa.
Dê um nome à tarefa, escolha o bloco de notas que criou e o cluster. Em seguida, selecione Criar para criar a tarefa.
Criar uma Função do Azure
Crie uma Função do Azure que execute a Tarefa.
Na área de trabalho do Azure Databricks, clique no nome de utilizador do Azure Databricks na barra superior e, em seguida, na lista pendente, selecione Definições do Utilizador.
No separador Tokens do Access , selecione Gerar novo token.
Copie o token apresentado e, em seguida, clique em Concluído.
No canto superior da área de trabalho do Databricks, selecione o ícone pessoas e, em seguida, selecione Definições de utilizador.
conta
Selecione o botão Gerar novo token e, em seguida, selecione o botão Gerar .
Certifique-se de que copia o token para um local seguro. A Função do Azure precisa deste token para se autenticar com o Databricks para que possa executar a Tarefa.
A partir do menu do portal do Azure ou a partir da Home page, selecione Criar um recurso.
Na página Novo, selecione >Aplicação de Funções de Computação.
No separador Noções Básicas da página Criar Aplicação de Funções , selecione um grupo de recursos e, em seguida, altere ou verifique as seguintes definições:
Definição Valor Nome da Aplicação de Funções contosoorder Pilha de runtime .NET Publicar Código Sistema Operativo Windows Tipo de plano Consumo (Sem servidor) Selecione Rever + criar e, em seguida, selecione Criar.
Quando a implementação estiver concluída, selecione Ir para recurso para abrir a página de descrição geral da Aplicação de Funções.
No grupo Definições , selecione Configuração.
Na página Definições da Aplicação , selecione o botão Nova definição da aplicação para adicionar cada definição.
Adicione as seguintes definições:
Nome da definição Valor DBX_INSTANCE A região da área de trabalho do databricks. Por exemplo: westus2.azuredatabricks.net
DBX_PAT O token de acesso pessoal que gerou anteriormente. DBX_JOB_ID O identificador da tarefa em execução. Selecione Guardar para consolidar estas definições.
No grupo Funções , selecione Funções e, em seguida, selecione Criar.
Selecione Azure Event Grid Acionador.
Instale a extensão Microsoft.Azure.WebJobs.Extensions.EventGrid se lhe for pedido para o fazer. Se tiver de instalá-lo, terá de escolher novamente Azure Event Grid Acionador para criar a função.
É apresentado o painel Nova Função .
No painel Nova Função , atribua um nome à função UpsertOrder e, em seguida, selecione o botão Criar .
Substitua o conteúdo do ficheiro de código por este código e, em seguida, selecione o botão Guardar :
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Este código analisa as informações sobre o evento de armazenamento que foi gerado e, em seguida, cria uma mensagem de pedido com o URL do ficheiro que acionou o evento. Como parte da mensagem, a função transmite um valor para o widget source_file que criou anteriormente. o código da função envia a mensagem para a Tarefa do Databricks e utiliza o token que obteve anteriormente como autenticação.
Criar uma subscrição do Event Grid
Nesta secção, irá criar uma subscrição do Event Grid que chama a Função do Azure quando os ficheiros são carregados para a conta de armazenamento.
Selecione Integração e, em seguida, na página Integração , selecione Acionador do Event Grid.
No painel Editar Acionador , atribua um nome ao evento
eventGridEvent
e, em seguida, selecione Criar subscrição de Evento.Nota
O nome
eventGridEvent
corresponde ao parâmetro denominado que é transmitido para a Função do Azure.No separador Noções Básicas da página Criar Subscrição de Eventos , altere ou verifique as seguintes definições:
Definição Valor Nome contoso-order-event-subscription Tipo de tópico Conta de armazenamento Recurso de Origem contosoorders Nome do tópico do sistema <create any name>
Filtrar para Tipos de Eventos Blob Criado e Blob Eliminado Selecione o botão Criar.
Testar a subscrição do Event Grid
Crie um ficheiro com o nome
customer-order.csv
, cole as seguintes informações nesse ficheiro e guarde-as no seu computador local.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
No Explorador de Armazenamento, carregue este ficheiro para a pasta de entrada da sua conta de armazenamento.
Carregar um ficheiro gera o evento Microsoft.Storage.BlobCreated . O Event Grid notifica todos os subscritores desse evento. No nosso caso, a Função do Azure é o único subscritor. A Função do Azure analisa os parâmetros do evento para determinar que evento ocorreu. Em seguida, transmite o URL do ficheiro para a Tarefa do Databricks. A Tarefa do Databricks lê o ficheiro e adiciona uma linha à tabela Databricks Delta que está localizada na sua conta de armazenamento.
Para verificar se a tarefa foi bem-sucedida, veja as execuções da sua tarefa. Verá um estado de conclusão. Para obter mais informações sobre como ver execuções para uma tarefa, consulte Ver execuções para uma tarefa
Numa nova célula do livro, execute esta consulta numa célula para ver a tabela delta atualizada.
%sql select * from customer_data
A tabela devolvida mostra o registo mais recente.
Para atualizar este registo, crie um ficheiro com o nome
customer-order-update.csv
, cole as seguintes informações nesse ficheiro e guarde-as no seu computador local.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Este ficheiro csv é quase idêntico ao anterior, exceto que a quantidade da encomenda é alterada de
228
para22
.No Explorador de Armazenamento, carregue este ficheiro para a pasta de entrada da sua conta de armazenamento.
Execute a
select
consulta novamente para ver a tabela delta atualizada.%sql select * from customer_data
A tabela devolvida mostra o registo atualizado.
Limpar os recursos
Quando já não forem necessários, elimine o grupo de recursos e todos os recursos relacionados. Para tal, selecione o grupo de recursos da conta de armazenamento e selecione Eliminar.