Share via


Tutorial: Implementar o padrão de captura do data lake para atualizar uma tabela do Databricks Delta

Este tutorial mostra-lhe como lidar com eventos numa conta de armazenamento que tem um espaço de nomes hierárquico.

Irá criar uma pequena solução que permite a um utilizador preencher uma tabela do Databricks Delta ao carregar um ficheiro de valores separados por vírgulas (csv) que descreve uma encomenda de vendas. Irá criar esta solução ao ligar uma subscrição do Event Grid, uma Função do Azure e uma Tarefa no Azure Databricks.

Neste tutorial, vai:

  • Crie uma subscrição do Event Grid que chame uma Função do Azure.
  • Crie uma Função do Azure que receba uma notificação de um evento e, em seguida, execute a tarefa no Azure Databricks.
  • Crie uma tarefa do Databricks que insira uma encomenda de cliente numa tabela do Databricks Delta localizada na conta de armazenamento.

Vamos criar esta solução por ordem inversa, a começar pela área de trabalho do Azure Databricks.

Pré-requisitos

Criar uma encomenda de vendas

Primeiro, crie um ficheiro csv que descreva uma encomenda de vendas e, em seguida, carregue esse ficheiro para a conta de armazenamento. Mais tarde, irá utilizar os dados deste ficheiro para preencher a primeira linha na nossa tabela Databricks Delta.

  1. No portal do Azure, navegue para a sua nova conta de armazenamento.

  2. Selecione Contentores de armazenamento browser-Blob> ->Adicionar contentor e crie um novo contentor com o nome dados.

    Captura de ecrã a mostrar a criação de uma pasta no browser de armazenamento.

  3. No contentor de dados , crie um diretório com o nome input.

  4. Cole o seguinte texto num editor de texto.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Guarde este ficheiro no seu computador local e atribua-lhe o nome data.csv.

  6. No browser de armazenamento, carregue este ficheiro para a pasta de entrada .

Criar uma tarefa no Azure Databricks

Nesta secção, irá realizar estas tarefas:

  • Crie uma área de trabalho do Azure Databricks.
  • Crie um bloco de notas.
  • Crie e preencha uma tabela do Databricks Delta.
  • Adicione código que insere linhas na tabela Delta do Databricks.
  • Criar uma Tarefa.

Criar uma área de trabalho do Azure Databricks

Nesta secção, vai criar uma área de trabalho do Azure Databricks com o portal do Azure.

  1. Crie uma área de trabalho do Azure Databricks. Atribua um nome a essa área de contoso-orderstrabalho . Veja Criar uma área de trabalho do Azure Databricks.

  2. Criar um cluster. Dê um nome ao cluster customer-order-cluster. Veja Criar um cluster.

  3. Crie um bloco de notas. Atribua um nome ao bloco de notas configure-customer-table e selecione Python como o idioma predefinido do bloco de notas. Veja Criar um bloco de notas.

Criar e preencher uma tabela do Databricks Delta

  1. No bloco de notas que criou, copie e cole o seguinte bloco de código na primeira célula, mas ainda não execute este código.

    Substitua os appIdvalores de marcador de posição , passwordtenant , neste bloco de código pelos valores que recolheu ao concluir os pré-requisitos deste tutorial.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Este código cria um widget com o nome source_file. Mais tarde, irá criar uma Função do Azure que chama este código e passa um caminho de ficheiro para esse widget. Este código também autentica o principal de serviço com a conta de armazenamento e cria algumas variáveis que irá utilizar noutras células.

    Nota

    Numa definição de produção, considere armazenar a chave de autenticação no Azure Databricks. Em seguida, adicione uma chave de pesquisa ao seu bloco de código em vez da chave de autenticação.

    Por exemplo, em vez de utilizar esta linha de código: spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>"), utilizaria a seguinte linha de código: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Depois de concluir este tutorial, veja o artigo Azure Data Lake Storage Gen2 no Site do Azure Databricks para ver exemplos desta abordagem.

  2. Prima as teclas SHIFT + ENTER para executar o código neste bloco.

  3. Copie e cole o seguinte bloco de código numa célula diferente e, em seguida, prima as teclas SHIFT + ENTER para executar o código neste bloco.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Este código cria a tabela Delta do Databricks na sua conta de armazenamento e, em seguida, carrega alguns dados iniciais do ficheiro csv que carregou anteriormente.

  4. Depois de este bloco de código ser executado com êxito, remova este bloco de código do seu bloco de notas.

Adicionar código que insere linhas na tabela Do Databricks Delta

  1. Copie e cole o seguinte bloco de código numa célula diferente, mas não execute esta célula.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Este código insere dados numa vista de tabela temporária com dados de um ficheiro csv. O caminho para esse ficheiro csv provém do widget de entrada que criou num passo anterior.

  2. Copie e cole o seguinte bloco de código numa célula diferente. Este código intercala o conteúdo da vista de tabela temporária com a tabela Databricks Delta.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

Create a Job (Crie uma Tarefa)

Crie uma Tarefa que execute o bloco de notas que criou anteriormente. Mais tarde, irá criar uma Função do Azure que executa esta tarefa quando um evento é gerado.

  1. Selecione Nova>Tarefa.

  2. Dê um nome à tarefa, escolha o bloco de notas que criou e o cluster. Em seguida, selecione Criar para criar a tarefa.

Criar uma Função do Azure

Crie uma Função do Azure que execute a Tarefa.

  1. Na área de trabalho do Azure Databricks, clique no nome de utilizador do Azure Databricks na barra superior e, em seguida, na lista pendente, selecione Definições do Utilizador.

  2. No separador Tokens do Access , selecione Gerar novo token.

  3. Copie o token apresentado e, em seguida, clique em Concluído.

  4. No canto superior da área de trabalho do Databricks, selecione o ícone pessoas e, em seguida, selecione Definições de utilizador.

    Gerir conta

  5. Selecione o botão Gerar novo token e, em seguida, selecione o botão Gerar .

    Certifique-se de que copia o token para um local seguro. A Função do Azure precisa deste token para se autenticar com o Databricks para que possa executar a Tarefa.

  6. A partir do menu do portal do Azure ou a partir da Home page, selecione Criar um recurso.

  7. Na página Novo, selecione >Aplicação de Funções de Computação.

  8. No separador Noções Básicas da página Criar Aplicação de Funções , selecione um grupo de recursos e, em seguida, altere ou verifique as seguintes definições:

    Definição Valor
    Nome da Aplicação de Funções contosoorder
    Pilha de runtime .NET
    Publicar Código
    Sistema Operativo Windows
    Tipo de plano Consumo (Sem servidor)
  9. Selecione Rever + criar e, em seguida, selecione Criar.

    Quando a implementação estiver concluída, selecione Ir para recurso para abrir a página de descrição geral da Aplicação de Funções.

  10. No grupo Definições , selecione Configuração.

  11. Na página Definições da Aplicação , selecione o botão Nova definição da aplicação para adicionar cada definição.

    Adicionar definição de configuração

    Adicione as seguintes definições:

    Nome da definição Valor
    DBX_INSTANCE A região da área de trabalho do databricks. Por exemplo: westus2.azuredatabricks.net
    DBX_PAT O token de acesso pessoal que gerou anteriormente.
    DBX_JOB_ID O identificador da tarefa em execução.
  12. Selecione Guardar para consolidar estas definições.

  13. No grupo Funções , selecione Funções e, em seguida, selecione Criar.

  14. Selecione Azure Event Grid Acionador.

    Instale a extensão Microsoft.Azure.WebJobs.Extensions.EventGrid se lhe for pedido para o fazer. Se tiver de instalá-lo, terá de escolher novamente Azure Event Grid Acionador para criar a função.

    É apresentado o painel Nova Função .

  15. No painel Nova Função , atribua um nome à função UpsertOrder e, em seguida, selecione o botão Criar .

  16. Substitua o conteúdo do ficheiro de código por este código e, em seguida, selecione o botão Guardar :

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Este código analisa as informações sobre o evento de armazenamento que foi gerado e, em seguida, cria uma mensagem de pedido com o URL do ficheiro que acionou o evento. Como parte da mensagem, a função transmite um valor para o widget source_file que criou anteriormente. o código da função envia a mensagem para a Tarefa do Databricks e utiliza o token que obteve anteriormente como autenticação.

Criar uma subscrição do Event Grid

Nesta secção, irá criar uma subscrição do Event Grid que chama a Função do Azure quando os ficheiros são carregados para a conta de armazenamento.

  1. Selecione Integração e, em seguida, na página Integração , selecione Acionador do Event Grid.

  2. No painel Editar Acionador , atribua um nome ao evento eventGridEvente, em seguida, selecione Criar subscrição de Evento.

    Nota

    O nome eventGridEvent corresponde ao parâmetro denominado que é transmitido para a Função do Azure.

  3. No separador Noções Básicas da página Criar Subscrição de Eventos , altere ou verifique as seguintes definições:

    Definição Valor
    Nome contoso-order-event-subscription
    Tipo de tópico Conta de armazenamento
    Recurso de Origem contosoorders
    Nome do tópico do sistema <create any name>
    Filtrar para Tipos de Eventos Blob Criado e Blob Eliminado
  4. Selecione o botão Criar.

Testar a subscrição do Event Grid

  1. Crie um ficheiro com o nome customer-order.csv, cole as seguintes informações nesse ficheiro e guarde-as no seu computador local.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. No Explorador de Armazenamento, carregue este ficheiro para a pasta de entrada da sua conta de armazenamento.

    Carregar um ficheiro gera o evento Microsoft.Storage.BlobCreated . O Event Grid notifica todos os subscritores desse evento. No nosso caso, a Função do Azure é o único subscritor. A Função do Azure analisa os parâmetros do evento para determinar que evento ocorreu. Em seguida, transmite o URL do ficheiro para a Tarefa do Databricks. A Tarefa do Databricks lê o ficheiro e adiciona uma linha à tabela Databricks Delta que está localizada na sua conta de armazenamento.

  3. Para verificar se a tarefa foi bem-sucedida, veja as execuções da sua tarefa. Verá um estado de conclusão. Para obter mais informações sobre como ver execuções para uma tarefa, consulte Ver execuções para uma tarefa

  4. Numa nova célula do livro, execute esta consulta numa célula para ver a tabela delta atualizada.

    %sql select * from customer_data
    

    A tabela devolvida mostra o registo mais recente.

    O registo mais recente aparece na tabela

  5. Para atualizar este registo, crie um ficheiro com o nome customer-order-update.csv, cole as seguintes informações nesse ficheiro e guarde-as no seu computador local.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Este ficheiro csv é quase idêntico ao anterior, exceto que a quantidade da encomenda é alterada de 228 para 22.

  6. No Explorador de Armazenamento, carregue este ficheiro para a pasta de entrada da sua conta de armazenamento.

  7. Execute a select consulta novamente para ver a tabela delta atualizada.

    %sql select * from customer_data
    

    A tabela devolvida mostra o registo atualizado.

    O registo atualizado aparece na tabela

Limpar os recursos

Quando já não forem necessários, elimine o grupo de recursos e todos os recursos relacionados. Para tal, selecione o grupo de recursos da conta de armazenamento e selecione Eliminar.

Passos seguintes