Sdílet prostřednictvím


Kurz: Implementace vzoru zachycení data lake pro aktualizaci tabulky Databricks Delta

V tomto kurzu se dozvíte, jak zpracovávat události v účtu úložiště, který má hierarchický obor názvů.

Vytvoříte malé řešení, které uživateli umožní naplnit tabulku Databricks Delta tak, že nahrajete soubor hodnot oddělených čárkami (CSV), který popisuje prodejní objednávku. Toto řešení vytvoříte propojením předplatného Event Gridu, funkce Azure a úlohy v Azure Databricks.

V tomto kurzu:

  • Vytvořte odběr Event Gridu, který volá funkci Azure Functions.
  • Vytvořte funkci Azure Functions, která obdrží oznámení z události a pak spustí úlohu v Azure Databricks.
  • Vytvořte úlohu Databricks, která vloží objednávku zákazníka do tabulky Databricks Delta umístěné v účtu úložiště.

Toto řešení sestavíme v obráceném pořadí, počínaje pracovním prostorem Azure Databricks.

Požadavky

Vytvoření prodejní objednávky

Nejprve vytvořte soubor CSV, který popisuje prodejní objednávku, a pak tento soubor nahrajte do účtu úložiště. Později použijete data z tohoto souboru k naplnění prvního řádku tabulky Databricks Delta.

  1. Na webu Azure Portal přejděte k novému účtu úložiště.

  2. Vyberte Prohlížeč služby Storage –>Kontejnery objektů blob –>Přidat kontejner a vytvořte nový kontejner s názvem data.

    Snímek obrazovky s vytvořením složky v prohlížeči úložiště

  3. V datovém kontejneru vytvořte adresář s názvem input.

  4. Do textového editoru vložte následující text.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Uložte tento soubor do místního počítače a pojmenujte ho data.csv.

  6. V prohlížeči úložiště nahrajte tento soubor do vstupní složky.

Vytvoření úlohy v Azure Databricks

V této části provedete tyto úlohy:

  • Vytvořte pracovní prostor Azure Databricks.
  • Vytvořte poznámkový blok.
  • Vytvořte a naplňte tabulku Databricks Delta.
  • Přidejte kód, který vloží řádky do tabulky Databricks Delta.
  • Vytvořte úlohu.

Vytvoření pracovního prostoru Azure Databricks

V této části vytvoříte pomocí portálu Azure pracovní prostor služby Azure Databricks.

  1. Vytvořte pracovní prostor Azure Databricks. Pojmenujte tento pracovní prostor contoso-orders. Viz Vytvoření pracovního prostoru Azure Databricks.

  2. Vytvořte cluster. Pojmenujte cluster customer-order-cluster. Viz Vytvoření clusteru.

  3. Vytvořte poznámkový blok. Pojmenujte poznámkový blok configure-customer-table a jako výchozí jazyk poznámkového bloku zvolte Python. Viz Vytvoření poznámkového bloku.

Vytvoření a naplnění tabulky Databricks Delta

  1. V poznámkovém bloku, který jste vytvořili, zkopírujte a vložte následující blok kódu do první buňky, ale tento kód ještě nespusouvejte.

    appIdNahraďte zástupné hodnoty , password, tenant v tomto bloku kódu hodnotami, které jste shromáždili při plnění požadavků tohoto kurzu.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Tento kód vytvoří widget s názvem source_file. Později vytvoříte funkci Azure Functions, která tento kód zavolá a předá do tohoto widgetu cestu k souboru. Tento kód také ověří váš instanční objekt s účtem úložiště a vytvoří některé proměnné, které použijete v jiných buňkách.

    Poznámka

    V produkčním nastavení zvažte uložení ověřovacího klíče v Azure Databricks. Pak do bloku kódu přidejte klíč vyhledávání místo ověřovacího klíče.

    Například místo tohoto řádku kódu: spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")byste použili následující řádek kódu: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Po dokončení tohoto kurzu najdete příklady tohoto přístupu v článku Azure Data Lake Storage Gen2 na webu Azure Databricks.

  2. Stisknutím kláves SHIFT + ENTER spusťte kód v tomto bloku.

  3. Zkopírujte a vložte následující blok kódu do jiné buňky a stisknutím kláves SHIFT + ENTER spusťte kód v tomto bloku.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Tento kód vytvoří tabulku Databricks Delta ve vašem účtu úložiště a pak načte počáteční data ze souboru CSV, který jste nahráli dříve.

  4. Po úspěšném spuštění tohoto bloku kódu odeberte tento blok kódu z poznámkového bloku.

Přidání kódu, který vloží řádky do tabulky Databricks Delta

  1. Zkopírujte a vložte následující blok kódu do jiné buňky, ale tuto buňku nespouštět.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Tento kód vloží data do dočasného zobrazení tabulky pomocí dat ze souboru CSV. Cesta k danému souboru CSV pochází ze vstupního widgetu, který jste vytvořili v předchozím kroku.

  2. Zkopírujte a vložte následující blok kódu do jiné buňky. Tento kód sloučí obsah dočasného zobrazení tabulky s tabulkou Databricks Delta.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

Vytvoření úlohy

Vytvořte úlohu, která spustí poznámkový blok, který jste vytvořili dříve. Později vytvoříte funkci Azure Functions, která tuto úlohu spustí při vyvolání události.

  1. Vyberte Nová úloha>.

  2. Pojmenujte úlohu, zvolte poznámkový blok, který jste vytvořili, a cluster. Pak vyberte Vytvořit a vytvořte úlohu.

Vytvoření funkce Azure Function

Vytvořte funkci Azure Functions, která spustí úlohu.

  1. V pracovním prostoru Azure Databricks klikněte na horním panelu na svoje uživatelské jméno Azure Databricks a pak v rozevíracím seznamu vyberte Uživatelská nastavení.

  2. Na kartě Přístupové tokeny vyberte Vygenerovat nový token.

  3. Zkopírujte token, který se zobrazí, a klikněte na Hotovo.

  4. V horním rohu pracovního prostoru Databricks zvolte ikonu lidé a pak zvolte Nastavení uživatele.

    Spravovat nastavení uživatele účtu

  5. Vyberte tlačítko Vygenerovat nový token a pak vyberte tlačítko Generovat .

    Nezapomeňte token zkopírovat na bezpečné místo. Funkce Azure Potřebuje tento token k ověření u Databricks, aby bylo možné úlohu spustit.

  6. V nabídce webu Azure Portal nebo na domovské stránce vyberte Vytvořit prostředek.

  7. Na stránce Nový vyberte Aplikace výpočetních>funkcí.

  8. Na kartě Základy na stránce Vytvořit aplikaci funkcí zvolte skupinu prostředků a pak změňte nebo ověřte následující nastavení:

    Nastavení Hodnota
    Název aplikace funkcí contosoorder
    Zásobník modulu runtime .NET
    Publikovat Kód
    Operační systém Windows
    Typ plánu Využití (bez serverů)
  9. Vyberte Zkontrolovat a vytvořit a potom vyberte Vytvořit.

    Po dokončení nasazení vyberte Přejít k prostředku a otevřete stránku s přehledem aplikace funkcí.

  10. Ve skupině Nastavení vyberte Konfigurace.

  11. Na stránce Nastavení aplikace zvolte tlačítko Nové nastavení aplikace a přidejte jednotlivá nastavení.

    Přidání nastavení konfigurace

    Přidejte následující nastavení:

    Název nastavení Hodnota
    DBX_INSTANCE Oblast pracovního prostoru databricks. Příklad: westus2.azuredatabricks.net
    DBX_PAT Token osobního přístupu, který jste vygenerovali dříve.
    DBX_JOB_ID Identifikátor spuštěné úlohy.
  12. Vyberte Uložit a potvrďte tato nastavení.

  13. Ve skupině Functions vyberte Funkce a pak vyberte Vytvořit.

  14. Zvolte aktivační událost Azure Event Grid.

    Pokud se zobrazí výzva, nainstalujte rozšíření Microsoft.Azure.WebJobs.Extensions.EventGrid . Pokud ho budete muset nainstalovat, budete muset znovu zvolit Azure Event Grid Trigger a funkci vytvořit.

    Zobrazí se podokno Nová funkce .

  15. V podokně Nová funkce pojmenujte funkci UpsertOrder a pak vyberte tlačítko Vytvořit .

  16. Nahraďte obsah souboru kódu tímto kódem a pak vyberte tlačítko Uložit :

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Tento kód parsuje informace o události úložiště, která byla vyvolána, a pak vytvoří zprávu požadavku s adresou URL souboru, který událost aktivoval. Jako součást zprávy funkce předá hodnotu widgetu source_file , který jste vytvořili dříve. kód funkce odešle zprávu do úlohy Databricks a jako ověřování použije token, který jste získali dříve.

Vytvoření odběru Event Gridu

V této části vytvoříte odběr Event Gridu, který volá funkci Azure Functions, když se soubory nahrají do účtu úložiště.

  1. Vyberte Integrace a pak na stránce Integrace vyberte Trigger Event Gridu.

  2. V podokně Upravit aktivační událost pojmenujte událost eventGridEventa pak vyberte Vytvořit odběr události.

    Poznámka

    Název eventGridEvent odpovídá parametru s názvem, který se předává do funkce Azure Functions.

  3. Na kartě Základy na stránce Vytvořit odběr události změňte nebo ověřte následující nastavení:

    Nastavení Hodnota
    Název contoso-order-event-subscription
    Typ tématu Účet úložiště
    Zdrojový prostředek contosoorders
    Název systémového tématu <create any name>
    Filtrovat na typy událostí Objekt blob se vytvořil a objekt blob se odstranil
  4. Vyberte tlačítko Vytvořit.

Testování odběru Event Gridu

  1. Vytvořte soubor s názvem customer-order.csv, vložte do něj následující informace a uložte ho do místního počítače.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. V Průzkumník služby Storage nahrajte tento soubor do vstupní složky vašeho účtu úložiště.

    Nahrání souboru vyvolá událost Microsoft.Storage.BlobCreated . Event Grid na tuto událost upozorní všechny odběratele. V našem případě je jediným předplatitelem funkce Azure Functions. Funkce Azure Functions parsuje parametry události a určí, ke které události došlo. Pak předá adresu URL souboru do úlohy Databricks. Úloha Databricks soubor přečte a přidá řádek do tabulky Databricks Delta, ve které se nachází váš účet úložiště.

  3. Pokud chcete zkontrolovat, jestli úloha proběhla úspěšně, podívejte se na spuštění úlohy. Zobrazí se stav dokončení. Další informace o zobrazení spuštění pro úlohu najdete v tématu Zobrazení spuštění pro úlohu.

  4. V nové buňce sešitu spusťte tento dotaz v buňce, aby se zobrazila aktualizovaná rozdílová tabulka.

    %sql select * from customer_data
    

    Vrácená tabulka zobrazuje nejnovější záznam.

    Poslední záznam se zobrazí v tabulce.

  5. Pokud chcete tento záznam aktualizovat, vytvořte soubor s názvem customer-order-update.csv, vložte do něj následující informace a uložte ho do místního počítače.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Tento soubor CSV je téměř identický s předchozím souborem s tím rozdílem, že množství objednávky se změní z 228 na 22.

  6. V Průzkumník služby Storage nahrajte tento soubor do vstupní složky vašeho účtu úložiště.

  7. Spusťte dotaz znovu, select aby se zobrazila aktualizovaná rozdílová tabulka.

    %sql select * from customer_data
    

    Vrácená tabulka zobrazuje aktualizovaný záznam.

    Aktualizovaný záznam se zobrazí v tabulce

Vyčištění prostředků

Pokud už je nepotřebujete, odstraňte skupinu prostředků a všechny související prostředky. Uděláte to tak, že vyberete skupinu prostředků pro účet úložiště a vyberete Odstranit.

Další kroky