Udostępnij za pośrednictwem


Samouczek: implementowanie wzorca przechwytywania usługi Data Lake w celu zaktualizowania tabeli delta usługi Databricks

W tym samouczku pokazano, jak obsługiwać zdarzenia na koncie magazynu, które ma hierarchiczną przestrzeń nazw.

Utworzysz małe rozwiązanie, które umożliwia użytkownikowi wypełnienie tabeli delta usługi Databricks przez przekazanie pliku wartości rozdzielanych przecinkami (csv), który opisuje zamówienie sprzedaży. To rozwiązanie utworzysz, łącząc subskrypcję usługi Event Grid, funkcję platformy Azure i zadanie w usłudze Azure Databricks.

W tym samouczku wykonasz następujące czynności:

  • Utwórz subskrypcję usługi Event Grid, która wywołuje funkcję platformy Azure.
  • Utwórz funkcję platformy Azure, która odbiera powiadomienie z zdarzenia, a następnie uruchamia zadanie w usłudze Azure Databricks.
  • Utwórz zadanie usługi Databricks, które wstawia zamówienie klienta do tabeli delty usługi Databricks znajdującej się na koncie magazynu.

Utworzymy to rozwiązanie w odwrotnej kolejności, począwszy od obszaru roboczego usługi Azure Databricks.

Wymagania wstępne

Tworzenie zamówienia sprzedaży

Najpierw utwórz plik csv opisujący zamówienie sprzedaży, a następnie przekaż ten plik do konta magazynu. Później użyjesz danych z tego pliku, aby wypełnić pierwszy wiersz w tabeli delta usługi Databricks.

  1. W witrynie Azure Portal przejdź do swojego nowego konta magazynu.

  2. Wybierz pozycję Storage browser-Blob>containers-Add container (Kontenery obiektów blobusługi Storage)> i utwórz nowy kontener o nazwie data (dane).

    Zrzut ekranu przedstawiający tworzenie folderu w przeglądarce magazynu.

  3. W kontenerze danych utwórz katalog o nazwie input.

  4. Wklej następujący tekst do edytora tekstów.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Zapisz ten plik na komputerze lokalnym i nadaj mu nazwę data.csv.

  6. W przeglądarce magazynu przekaż ten plik do folderu wejściowego .

Tworzenie zadania w usłudze Azure Databricks

W tej sekcji wykonasz następujące zadania:

  • Tworzenie obszaru roboczego usługi Azure Databricks.
  • Utwórz notes.
  • Utwórz i wypełnij tabelę delty usługi Databricks.
  • Dodaj kod, który wstawia wiersze do tabeli delty usługi Databricks.
  • Utwórz zadanie.

Tworzenie obszaru roboczego usługi Azure Databricks

W tej sekcji utworzysz obszar roboczy usługi Azure Databricks przy użyciu witryny Azure Portal.

  1. Tworzenie obszaru roboczego usługi Azure Databricks. Nadaj nazwę obszarowi roboczego contoso-orders. Zobacz Tworzenie obszaru roboczego usługi Azure Databricks.

  2. Tworzenie klastra. Nadaj klastrowi customer-order-clusternazwę . Zobacz Tworzenie klastra.

  3. Utwórz notes. Nadaj notesowi configure-customer-table nazwę i wybierz język Python jako domyślny język notesu. Zobacz Tworzenie notesu.

Tworzenie i wypełnianie tabeli delty usługi Databricks

  1. W utworzonym notesie skopiuj i wklej następujący blok kodu do pierwszej komórki, ale nie uruchamiaj jeszcze tego kodu.

    Zastąp appIdwartości zastępcze , passwordtenant w tym bloku kodu wartościami zebranymi podczas wykonywania wymagań wstępnych tego samouczka.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Ten kod tworzy widżet o nazwie source_file. Później utworzysz funkcję platformy Azure, która wywołuje ten kod i przekazuje ścieżkę pliku do tego widżetu. Ten kod uwierzytelnia również jednostkę usługi przy użyciu konta magazynu i tworzy pewne zmienne, które będą używane w innych komórkach.

    Uwaga

    W środowisku produkcyjnym rozważ przechowywanie klucza uwierzytelniania w usłudze Azure Databricks. Następnie dodaj do bloku kodu klucz wyszukiwania zamiast klucza uwierzytelniania.

    Na przykład zamiast używać tego wiersza kodu: spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")użyj następującego wiersza kodu: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Po ukończeniu tego samouczka zapoznaj się z artykułem Azure Data Lake Storage Gen2 w witrynie internetowej usługi Azure Databricks, aby zapoznać się z przykładami tego podejścia.

  2. Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

  3. Skopiuj i wklej następujący blok kodu do innej komórki, a następnie naciśnij klawisze SHIFT + ENTER , aby uruchomić kod w tym bloku.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Ten kod tworzy tabelę delta usługi Databricks na koncie magazynu, a następnie ładuje dane początkowe z przekazanego wcześniej pliku CSV.

  4. Po pomyślnym uruchomieniu tego bloku kodu usuń ten blok kodu z notesu.

Dodawanie kodu, który wstawia wiersze do tabeli delty usługi Databricks

  1. Skopiuj i wklej następujący blok kodu do innej komórki, ale nie uruchamiaj tej komórki.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Ten kod wstawia dane do tymczasowego widoku tabeli przy użyciu danych z pliku CSV. Ścieżka do tego pliku CSV pochodzi z widżetu wejściowego utworzonego we wcześniejszym kroku.

  2. Skopiuj i wklej następujący blok kodu do innej komórki. Ten kod scala zawartość tymczasowego widoku tabeli z tabelą delta usługi Databricks.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

Tworzenie zadania

Utwórz zadanie uruchamiające utworzony wcześniej notes. Później utworzysz funkcję platformy Azure, która uruchamia to zadanie po wywołaniu zdarzenia.

  1. Wybierz pozycję Nowe>zadanie.

  2. Nadaj zadaniu nazwę, wybierz utworzony notes i klaster. Następnie wybierz pozycję Utwórz , aby utworzyć zadanie.

Tworzenie funkcji platformy Azure

Utwórz funkcję platformy Azure, która uruchamia zadanie.

  1. W obszarze roboczym usługi Azure Databricks kliknij nazwę użytkownika usługi Azure Databricks na górnym pasku, a następnie z listy rozwijanej wybierz pozycję Ustawienia użytkownika.

  2. Na karcie Tokeny dostępu wybierz pozycję Generuj nowy token.

  3. Skopiuj wyświetlony token, a następnie kliknij przycisk Gotowe.

  4. W górnym rogu obszaru roboczego usługi Databricks wybierz ikonę osób, a następnie wybierz pozycję Ustawienia użytkownika.

    Zarządzanie ustawieniami użytkownika konta

  5. Wybierz przycisk Generuj nowy token , a następnie wybierz przycisk Generuj .

    Pamiętaj, aby skopiować token do bezpiecznego miejsca. Funkcja platformy Azure wymaga tego tokenu do uwierzytelnienia w usłudze Databricks, aby można było uruchomić zadanie.

  6. W menu witryny Azure Portal lub na stronie głównej wybierz pozycję Utwórz zasób.

  7. Na stronie Nowy wybierz pozycjęAplikacja funkcjiobliczeniowej>.

  8. Na karcie Podstawy strony Tworzenie aplikacji funkcji wybierz grupę zasobów, a następnie zmień lub zweryfikuj następujące ustawienia:

    Ustawienie Wartość
    Nazwa aplikacji funkcji contosoorder
    Stos środowiska uruchomieniowego .NET
    Publikowanie Kod
    System operacyjny Windows
    Typ planu Zużycie (bezserwerowe)
  9. Wybierz pozycję Przejrzyj i utwórz, a następnie wybierz pozycję Utwórz.

    Po zakończeniu wdrażania wybierz pozycję Przejdź do zasobu , aby otworzyć stronę przeglądu aplikacji funkcji.

  10. W grupie Ustawienia wybierz pozycję Konfiguracja.

  11. Na stronie Ustawienia aplikacji wybierz przycisk Nowe ustawienie aplikacji , aby dodać każde ustawienie.

    Dodaj ustawienie konfiguracji Dodaj

    Dodaj następujące ustawienia:

    Nazwa ustawienia Wartość
    DBX_INSTANCE Region obszaru roboczego usługi Databricks. Na przykład: westus2.azuredatabricks.net
    DBX_PAT Osobisty token dostępu wygenerowany wcześniej.
    DBX_JOB_ID Identyfikator uruchomionego zadania.
  12. Wybierz pozycję Zapisz , aby zatwierdzić te ustawienia.

  13. W grupie Funkcje wybierz pozycję Funkcje, a następnie wybierz pozycję Utwórz.

  14. Wybierz pozycję Azure Event Grid Trigger (Wyzwalacz Azure Event Grid).

    Zainstaluj rozszerzenie Microsoft.Azure.WebJobs.Extensions.EventGrid , jeśli zostanie wyświetlony monit o to. Jeśli musisz go zainstalować, musisz ponownie wybrać Azure Event Grid Trigger, aby utworzyć funkcję.

    Zostanie wyświetlone okienko Nowa funkcja .

  15. W okienku Nowa funkcja nadaj funkcji nazwę UpsertOrder, a następnie wybierz przycisk Utwórz .

  16. Zastąp zawartość pliku kodu tym kodem, a następnie wybierz przycisk Zapisz :

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Ten kod analizuje informacje o zgłoszonym zdarzeniu magazynu, a następnie tworzy komunikat żądania z adresem URL pliku, który wyzwolił zdarzenie. W ramach komunikatu funkcja przekazuje wartość do utworzonego wcześniej widżetu source_file . kod funkcji wysyła komunikat do zadania usługi Databricks i używa tokenu uzyskanego wcześniej jako uwierzytelnianie.

Tworzenie subskrypcji usługi Event Grid

W tej sekcji utworzysz subskrypcję usługi Event Grid, która wywołuje funkcję platformy Azure po przekazaniu plików na konto magazynu.

  1. Wybierz pozycję Integracja, a następnie na stronie Integracja wybierz pozycję Wyzwalacz usługi Event Grid.

  2. W okienku Edytowanie wyzwalacza nadaj zdarzeniu eventGridEventnazwę , a następnie wybierz pozycję Utwórz subskrypcję zdarzeń.

    Uwaga

    Nazwa eventGridEvent jest zgodna z parametrem o nazwie, który jest przekazywany do funkcji platformy Azure.

  3. Na karcie Podstawowe na stronie Tworzenie subskrypcji zdarzeń zmień lub zweryfikuj następujące ustawienia:

    Ustawienie Wartość
    Nazwa contoso-order-event-subscription
    Typ tematu Konto magazynu
    Zasób źródłowy contosoorders
    Nazwa tematu systemowego <create any name>
    Filtrowanie do typów zdarzeń Utworzony obiekt blob i usunięty obiekt blob
  4. Wybierz przycisk Utwórz.

Testowanie subskrypcji usługi Event Grid

  1. Utwórz plik o nazwie customer-order.csv, wklej następujące informacje do tego pliku i zapisz go na komputerze lokalnym.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. W Eksplorator usługi Storage przekaż ten plik do folderu wejściowego konta magazynu.

    Przekazanie pliku powoduje zgłoszenie zdarzenia Microsoft.Storage.BlobCreated . Usługa Event Grid powiadamia wszystkich subskrybentów o tym zdarzeniu. W naszym przypadku funkcja platformy Azure jest jedynym subskrybentem. Funkcja platformy Azure analizuje parametry zdarzenia, aby określić, które zdarzenie wystąpiło. Następnie przekazuje adres URL pliku do zadania usługi Databricks. Zadanie usługi Databricks odczytuje plik i dodaje wiersz do tabeli delty usługi Databricks, która znajduje się na koncie magazynu.

  3. Aby sprawdzić, czy zadanie zakończyło się pomyślnie, wyświetl uruchomienia zadania. Zostanie wyświetlony stan ukończenia. Aby uzyskać więcej informacji na temat wyświetlania przebiegów dla zadania, zobacz Wyświetlanie przebiegów zadania

  4. W nowej komórce skoroszytu uruchom to zapytanie w komórce, aby wyświetlić zaktualizowaną tabelę delty.

    %sql select * from customer_data
    

    Zwrócona tabela zawiera najnowszy rekord.

    Najnowszy rekord jest wyświetlany w tabeli Najnowszy

  5. Aby zaktualizować ten rekord, utwórz plik o nazwie customer-order-update.csv, wklej następujące informacje do tego pliku i zapisz go na komputerze lokalnym.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Ten plik CSV jest prawie identyczny z poprzednim, z wyjątkiem ilości zamówienia został zmieniony z 228 na 22.

  6. W Eksplorator usługi Storage przekaż ten plik do folderu wejściowego konta magazynu.

  7. Uruchom ponownie zapytanie, select aby wyświetlić zaktualizowaną tabelę delty.

    %sql select * from customer_data
    

    Zwrócona tabela zawiera zaktualizowany rekord.

    Zaktualizowany rekord jest wyświetlany w tabeli

Czyszczenie zasobów

Gdy grupa zasobów i wszystkie pokrewne zasoby nie będą już potrzebne, usuń je. W tym celu zaznacz grupę zasobów konta magazynu i wybierz pozycję Usuń.

Następne kroki