Samouczek: implementowanie wzorca przechwytywania usługi Data Lake w celu zaktualizowania tabeli delta usługi Databricks
W tym samouczku pokazano, jak obsługiwać zdarzenia na koncie magazynu, które ma hierarchiczną przestrzeń nazw.
Utworzysz małe rozwiązanie, które umożliwia użytkownikowi wypełnienie tabeli delta usługi Databricks przez przekazanie pliku wartości rozdzielanych przecinkami (csv), który opisuje zamówienie sprzedaży. To rozwiązanie utworzysz, łącząc subskrypcję usługi Event Grid, funkcję platformy Azure i zadanie w usłudze Azure Databricks.
W tym samouczku wykonasz następujące czynności:
- Utwórz subskrypcję usługi Event Grid, która wywołuje funkcję platformy Azure.
- Utwórz funkcję platformy Azure, która odbiera powiadomienie z zdarzenia, a następnie uruchamia zadanie w usłudze Azure Databricks.
- Utwórz zadanie usługi Databricks, które wstawia zamówienie klienta do tabeli delty usługi Databricks znajdującej się na koncie magazynu.
Utworzymy to rozwiązanie w odwrotnej kolejności, począwszy od obszaru roboczego usługi Azure Databricks.
Wymagania wstępne
Utwórz konto magazynu z hierarchiczną przestrzenią nazw (Azure Data Lake Storage Gen2). W tym samouczku jest używane konto magazynu o nazwie
contosoorders
.Zobacz Tworzenie konta magazynu do użycia z Azure Data Lake Storage Gen2.
Upewnij się, że Twoje konto użytkownika ma przypisaną rolę Współautor danych obiektu blob magazynu.
Utwórz jednostkę usługi, utwórz wpis tajny klienta, a następnie przyznaj jednostce usługi dostęp do konta magazynu.
Zobacz Samouczek: nawiązywanie połączenia z Azure Data Lake Storage Gen2 (kroki od 1 do 3). Po wykonaniu tych kroków wklej wartości identyfikatora dzierżawy, identyfikatora aplikacji i klucza tajnego klienta do pliku tekstowego. Wkrótce będą potrzebne.
Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.
Tworzenie zamówienia sprzedaży
Najpierw utwórz plik csv opisujący zamówienie sprzedaży, a następnie przekaż ten plik do konta magazynu. Później użyjesz danych z tego pliku, aby wypełnić pierwszy wiersz w tabeli delta usługi Databricks.
W witrynie Azure Portal przejdź do swojego nowego konta magazynu.
Wybierz pozycję Storage browser-Blob>containers-Add container (Kontenery obiektów blobusługi Storage)> i utwórz nowy kontener o nazwie data (dane).
W kontenerze danych utwórz katalog o nazwie input.
Wklej następujący tekst do edytora tekstów.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Zapisz ten plik na komputerze lokalnym i nadaj mu nazwę data.csv.
W przeglądarce magazynu przekaż ten plik do folderu wejściowego .
Tworzenie zadania w usłudze Azure Databricks
W tej sekcji wykonasz następujące zadania:
- Tworzenie obszaru roboczego usługi Azure Databricks.
- Utwórz notes.
- Utwórz i wypełnij tabelę delty usługi Databricks.
- Dodaj kod, który wstawia wiersze do tabeli delty usługi Databricks.
- Utwórz zadanie.
Tworzenie obszaru roboczego usługi Azure Databricks
W tej sekcji utworzysz obszar roboczy usługi Azure Databricks przy użyciu witryny Azure Portal.
Tworzenie obszaru roboczego usługi Azure Databricks. Nadaj nazwę obszarowi roboczego
contoso-orders
. Zobacz Tworzenie obszaru roboczego usługi Azure Databricks.Tworzenie klastra. Nadaj klastrowi
customer-order-cluster
nazwę . Zobacz Tworzenie klastra.Utwórz notes. Nadaj notesowi
configure-customer-table
nazwę i wybierz język Python jako domyślny język notesu. Zobacz Tworzenie notesu.
Tworzenie i wypełnianie tabeli delty usługi Databricks
W utworzonym notesie skopiuj i wklej następujący blok kodu do pierwszej komórki, ale nie uruchamiaj jeszcze tego kodu.
Zastąp
appId
wartości zastępcze ,password
tenant
w tym bloku kodu wartościami zebranymi podczas wykonywania wymagań wstępnych tego samouczka.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Ten kod tworzy widżet o nazwie source_file. Później utworzysz funkcję platformy Azure, która wywołuje ten kod i przekazuje ścieżkę pliku do tego widżetu. Ten kod uwierzytelnia również jednostkę usługi przy użyciu konta magazynu i tworzy pewne zmienne, które będą używane w innych komórkach.
Uwaga
W środowisku produkcyjnym rozważ przechowywanie klucza uwierzytelniania w usłudze Azure Databricks. Następnie dodaj do bloku kodu klucz wyszukiwania zamiast klucza uwierzytelniania.
Na przykład zamiast używać tego wiersza kodu:spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
użyj następującego wiersza kodu:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
Po ukończeniu tego samouczka zapoznaj się z artykułem Azure Data Lake Storage Gen2 w witrynie internetowej usługi Azure Databricks, aby zapoznać się z przykładami tego podejścia.Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.
Skopiuj i wklej następujący blok kodu do innej komórki, a następnie naciśnij klawisze SHIFT + ENTER , aby uruchomić kod w tym bloku.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Ten kod tworzy tabelę delta usługi Databricks na koncie magazynu, a następnie ładuje dane początkowe z przekazanego wcześniej pliku CSV.
Po pomyślnym uruchomieniu tego bloku kodu usuń ten blok kodu z notesu.
Dodawanie kodu, który wstawia wiersze do tabeli delty usługi Databricks
Skopiuj i wklej następujący blok kodu do innej komórki, ale nie uruchamiaj tej komórki.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Ten kod wstawia dane do tymczasowego widoku tabeli przy użyciu danych z pliku CSV. Ścieżka do tego pliku CSV pochodzi z widżetu wejściowego utworzonego we wcześniejszym kroku.
Skopiuj i wklej następujący blok kodu do innej komórki. Ten kod scala zawartość tymczasowego widoku tabeli z tabelą delta usługi Databricks.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
Tworzenie zadania
Utwórz zadanie uruchamiające utworzony wcześniej notes. Później utworzysz funkcję platformy Azure, która uruchamia to zadanie po wywołaniu zdarzenia.
Wybierz pozycję Nowe>zadanie.
Nadaj zadaniu nazwę, wybierz utworzony notes i klaster. Następnie wybierz pozycję Utwórz , aby utworzyć zadanie.
Tworzenie funkcji platformy Azure
Utwórz funkcję platformy Azure, która uruchamia zadanie.
W obszarze roboczym usługi Azure Databricks kliknij nazwę użytkownika usługi Azure Databricks na górnym pasku, a następnie z listy rozwijanej wybierz pozycję Ustawienia użytkownika.
Na karcie Tokeny dostępu wybierz pozycję Generuj nowy token.
Skopiuj wyświetlony token, a następnie kliknij przycisk Gotowe.
W górnym rogu obszaru roboczego usługi Databricks wybierz ikonę osób, a następnie wybierz pozycję Ustawienia użytkownika.
Wybierz przycisk Generuj nowy token , a następnie wybierz przycisk Generuj .
Pamiętaj, aby skopiować token do bezpiecznego miejsca. Funkcja platformy Azure wymaga tego tokenu do uwierzytelnienia w usłudze Databricks, aby można było uruchomić zadanie.
W menu witryny Azure Portal lub na stronie głównej wybierz pozycję Utwórz zasób.
Na stronie Nowy wybierz pozycjęAplikacja funkcjiobliczeniowej>.
Na karcie Podstawy strony Tworzenie aplikacji funkcji wybierz grupę zasobów, a następnie zmień lub zweryfikuj następujące ustawienia:
Ustawienie Wartość Nazwa aplikacji funkcji contosoorder Stos środowiska uruchomieniowego .NET Publikowanie Kod System operacyjny Windows Typ planu Zużycie (bezserwerowe) Wybierz pozycję Przejrzyj i utwórz, a następnie wybierz pozycję Utwórz.
Po zakończeniu wdrażania wybierz pozycję Przejdź do zasobu , aby otworzyć stronę przeglądu aplikacji funkcji.
W grupie Ustawienia wybierz pozycję Konfiguracja.
Na stronie Ustawienia aplikacji wybierz przycisk Nowe ustawienie aplikacji , aby dodać każde ustawienie.
Dodaj następujące ustawienia:
Nazwa ustawienia Wartość DBX_INSTANCE Region obszaru roboczego usługi Databricks. Na przykład: westus2.azuredatabricks.net
DBX_PAT Osobisty token dostępu wygenerowany wcześniej. DBX_JOB_ID Identyfikator uruchomionego zadania. Wybierz pozycję Zapisz , aby zatwierdzić te ustawienia.
W grupie Funkcje wybierz pozycję Funkcje, a następnie wybierz pozycję Utwórz.
Wybierz pozycję Azure Event Grid Trigger (Wyzwalacz Azure Event Grid).
Zainstaluj rozszerzenie Microsoft.Azure.WebJobs.Extensions.EventGrid , jeśli zostanie wyświetlony monit o to. Jeśli musisz go zainstalować, musisz ponownie wybrać Azure Event Grid Trigger, aby utworzyć funkcję.
Zostanie wyświetlone okienko Nowa funkcja .
W okienku Nowa funkcja nadaj funkcji nazwę UpsertOrder, a następnie wybierz przycisk Utwórz .
Zastąp zawartość pliku kodu tym kodem, a następnie wybierz przycisk Zapisz :
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Ten kod analizuje informacje o zgłoszonym zdarzeniu magazynu, a następnie tworzy komunikat żądania z adresem URL pliku, który wyzwolił zdarzenie. W ramach komunikatu funkcja przekazuje wartość do utworzonego wcześniej widżetu source_file . kod funkcji wysyła komunikat do zadania usługi Databricks i używa tokenu uzyskanego wcześniej jako uwierzytelnianie.
Tworzenie subskrypcji usługi Event Grid
W tej sekcji utworzysz subskrypcję usługi Event Grid, która wywołuje funkcję platformy Azure po przekazaniu plików na konto magazynu.
Wybierz pozycję Integracja, a następnie na stronie Integracja wybierz pozycję Wyzwalacz usługi Event Grid.
W okienku Edytowanie wyzwalacza nadaj zdarzeniu
eventGridEvent
nazwę , a następnie wybierz pozycję Utwórz subskrypcję zdarzeń.Uwaga
Nazwa
eventGridEvent
jest zgodna z parametrem o nazwie, który jest przekazywany do funkcji platformy Azure.Na karcie Podstawowe na stronie Tworzenie subskrypcji zdarzeń zmień lub zweryfikuj następujące ustawienia:
Ustawienie Wartość Nazwa contoso-order-event-subscription Typ tematu Konto magazynu Zasób źródłowy contosoorders Nazwa tematu systemowego <create any name>
Filtrowanie do typów zdarzeń Utworzony obiekt blob i usunięty obiekt blob Wybierz przycisk Utwórz.
Testowanie subskrypcji usługi Event Grid
Utwórz plik o nazwie
customer-order.csv
, wklej następujące informacje do tego pliku i zapisz go na komputerze lokalnym.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
W Eksplorator usługi Storage przekaż ten plik do folderu wejściowego konta magazynu.
Przekazanie pliku powoduje zgłoszenie zdarzenia Microsoft.Storage.BlobCreated . Usługa Event Grid powiadamia wszystkich subskrybentów o tym zdarzeniu. W naszym przypadku funkcja platformy Azure jest jedynym subskrybentem. Funkcja platformy Azure analizuje parametry zdarzenia, aby określić, które zdarzenie wystąpiło. Następnie przekazuje adres URL pliku do zadania usługi Databricks. Zadanie usługi Databricks odczytuje plik i dodaje wiersz do tabeli delty usługi Databricks, która znajduje się na koncie magazynu.
Aby sprawdzić, czy zadanie zakończyło się pomyślnie, wyświetl uruchomienia zadania. Zostanie wyświetlony stan ukończenia. Aby uzyskać więcej informacji na temat wyświetlania przebiegów dla zadania, zobacz Wyświetlanie przebiegów zadania
W nowej komórce skoroszytu uruchom to zapytanie w komórce, aby wyświetlić zaktualizowaną tabelę delty.
%sql select * from customer_data
Zwrócona tabela zawiera najnowszy rekord.
Aby zaktualizować ten rekord, utwórz plik o nazwie
customer-order-update.csv
, wklej następujące informacje do tego pliku i zapisz go na komputerze lokalnym.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Ten plik CSV jest prawie identyczny z poprzednim, z wyjątkiem ilości zamówienia został zmieniony z
228
na22
.W Eksplorator usługi Storage przekaż ten plik do folderu wejściowego konta magazynu.
Uruchom ponownie zapytanie,
select
aby wyświetlić zaktualizowaną tabelę delty.%sql select * from customer_data
Zwrócona tabela zawiera zaktualizowany rekord.
Czyszczenie zasobów
Gdy grupa zasobów i wszystkie pokrewne zasoby nie będą już potrzebne, usuń je. W tym celu zaznacz grupę zasobów konta magazynu i wybierz pozycję Usuń.