Öğretici: Databricks Delta tablosunu güncelleştirmek için data lake yakalama desenini uygulama
Bu öğreticide, hiyerarşik ad alanına sahip bir depolama hesabında olayları işleme gösterilmektedir.
Kullanıcının satış siparişini açıklayan virgülle ayrılmış değerler (csv) dosyasını karşıya yükleyerek Databricks Delta tablosunu doldurmasını sağlayan küçük bir çözüm oluşturacaksınız. Bir Event Grid aboneliği, bir Azure İşlevi ve Azure Databricks'teki bir işi birbirine bağlayarak bu çözümü oluşturacaksınız.
Bu öğreticide şunları yapacaksınız:
- Azure İşlevi çağıran bir Event Grid aboneliği oluşturun.
- Bir olaydan bildirim alan ve ardından işi Azure Databricks'te çalıştıran bir Azure İşlevi oluşturun.
- Depolama hesabında bulunan Databricks Delta tablosuna müşteri siparişi ekleyen bir Databricks işi oluşturun.
Bu çözümü Azure Databricks çalışma alanından başlayarak ters sırada oluşturacağız.
Önkoşullar
Hiyerarşik ad alanına (Azure Data Lake Storage 2. Nesil) sahip bir depolama hesabı oluşturun. Bu öğreticide adlı
contosoorders
bir depolama hesabı kullanılır.Bkz. Azure Data Lake Storage 2. Nesil ile kullanmak için depolama hesabı oluşturma.
Kullanıcı hesabınıza Depolama Blobu Veri Katkıda Bulunanı rolünün atandığından emin olun.
Bir hizmet sorumlusu oluşturun, bir istemci gizli dizisi oluşturun ve ardından hizmet sorumlusuna depolama hesabına erişim verin.
Bkz. Öğretici: Azure Data Lake Storage 2. Nesil bağlanma (1- 3. Adım). Bu adımları tamamladıktan sonra kiracı kimliği, uygulama kimliği ve istemci gizli anahtarı değerlerini bir metin dosyasına yapıştırdığınızdan emin olun. Bunlara yakında ihtiyacın olacak.
Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Satış siparişi oluşturma
İlk olarak, satış siparişini açıklayan bir csv dosyası oluşturun ve ardından bu dosyayı depolama hesabına yükleyin. Daha sonra, Databricks Delta tablomuzdaki ilk satırı doldurmak için bu dosyadaki verileri kullanacaksınız.
Azure portalında yeni depolama hesabınıza gidin.
Depolama tarayıcısı-Blob>kapsayıcıları-Kapsayıcı>ekle'yi seçin ve data adlı yeni bir kapsayıcı oluşturun.
Veri kapsayıcısında input adlı bir dizin oluşturun.
Aşağıdaki metni bir metin düzenleyicisine yapıştırın.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
Bu dosyayı yerel bilgisayarınıza kaydedin ve data.csvadını verin.
Depolama tarayıcısında bu dosyayı giriş klasörüne yükleyin.
Azure Databricks'te iş oluşturma
Bu bölümde şu görevleri gerçekleştireceksiniz:
- Azure Databricks çalışma alanı oluşturun.
- Bir not defteri oluşturun.
- Databricks Delta tablosu oluşturma ve doldurma.
- Databricks Delta tablosuna satır ekleyen kod ekleyin.
- bir İş oluşturun.
Azure Databricks çalışma alanı oluşturma
Bu bölümde Azure portalını kullanarak bir Azure Databricks çalışma alanı oluşturursunuz.
Azure Databricks çalışma alanı oluşturun. Bu çalışma alanını
contoso-orders
olarak adlandırabilirsiniz. Bkz. Azure Databricks çalışma alanı oluşturma.Küme oluşturma. Kümeyi
customer-order-cluster
olarak adlandırın. Bkz. Küme oluşturma.Bir not defteri oluşturun. Not defterini adlandırın ve not defterinin
configure-customer-table
varsayılan dili olarak Python'ı seçin. Bkz. Not defteri oluşturma.
Databricks Delta tablosu oluşturma ve doldurma
Oluşturduğunuz not defterinde aşağıdaki kod bloğunu kopyalayıp ilk hücreye yapıştırın, ancak bu kodu henüz çalıştırmayın.
Bu kod bloğundaki
appId
,password
,tenant
yer tutucu değerlerini, bu öğreticinin önkoşullarını tamamlarken topladığınız değerlerle değiştirin.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'
Bu kod source_file adlı bir pencere öğesi oluşturur. Daha sonra bu kodu çağıran ve bu pencere öğesine bir dosya yolu geçiren bir Azure İşlevi oluşturacaksınız. Bu kod ayrıca depolama hesabıyla hizmet sorumlunuzun kimliğini doğrular ve diğer hücrelerde kullanacağınız bazı değişkenler oluşturur.
Not
Üretim ayarında kimlik doğrulama anahtarınızı Azure Databricks'te depolamayı göz önünde bulundurun. Ardından, kimlik doğrulama anahtarı yerine kod bloğunuza bir arama anahtarı ekleyin.
Örneğin, şu kodspark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
satırını kullanmak yerine aşağıdaki kod satırını kullanırsınız:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>"))
.
Bu öğreticiyi tamamladıktan sonra, bu yaklaşımın örneklerini görmek için Azure Databricks Web Sitesindeki Azure Data Lake Storage 2. Nesil makalesine bakın.Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın ve ardından shift + ENTER tuşlarına basarak kodu bu blokta çalıştırın.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))
Bu kod, depolama hesabınızda Databricks Delta tablosunu oluşturur ve daha önce karşıya yüklediğiniz csv dosyasından bazı ilk verileri yükler.
Bu kod bloğu başarıyla çalıştırıldıktan sonra bu kod bloğunu not defterinizden kaldırın.
Databricks Delta tablosuna satır ekleyen kod ekleme
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın, ancak bu hücreyi çalıştırmayın.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
Bu kod, csv dosyasındaki verileri kullanarak verileri geçici bir tablo görünümüne ekler. Bu csv dosyasının yolu, önceki bir adımda oluşturduğunuz giriş pencere öğesinden gelir.
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın. Bu kod, geçici tablo görünümünün içeriğini Databricks Delta tablosuyla birleştirir.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
İş Oluşturma
Daha önce oluşturduğunuz not defterini çalıştıran bir İş oluşturun. Daha sonra, bir olay tetiklendiğinde bu işi çalıştıran bir Azure İşlevi oluşturacaksınız.
Yeni İş'i> seçin.
İşe bir ad verin, oluşturduğunuz not defterini seçin ve kümeleyin. Ardından, işi oluşturmak için Oluştur'u seçin.
Azure İşlevi oluşturma
İşi çalıştıran bir Azure İşlevi oluşturun.
Azure Databricks çalışma alanınızda üst çubukta Azure Databricks kullanıcı adınıza tıklayın ve açılan listeden Kullanıcı Ayarları'nı seçin.
Erişim belirteçleri sekmesinde Yeni belirteç oluştur'a tıklayın.
Görüntülenen belirteci kopyalayın ve Bitti'ye tıklayın.
Databricks çalışma alanının üst köşesinde kişiler simgesini ve ardından Kullanıcı ayarları'nı seçin.
Yeni belirteç oluştur düğmesini ve ardından Oluştur düğmesini seçin.
Belirteci güvenli bir yere kopyaladığınızdan emin olun. Azure İşlevinizin, İşi çalıştırabilmesi için Databricks ile kimlik doğrulaması için bu belirteci sağlaması gerekir.
Azure portalı menüsünde veya Giriş sayfasında Kaynak oluştur'u seçin.
Yeni sayfasında İşlem>İşlevi Uygulaması'nı seçin.
İşlev Uygulaması Oluştur sayfasının Temel sekmesinde bir kaynak grubu seçin ve ardından aşağıdaki ayarları değiştirin veya doğrulayın:
Ayar Değer İşlev Uygulamasının adı contosoorder Çalışma zamanı yığını .NET Yayımlama Kod Operating System Windows Plan türü Kullanım (Sunucusuz) Gözden Geçir + oluştur’u, sonra da Oluştur’u seçin.
Dağıtım tamamlandığında Kaynağa git'i seçerek İşlev Uygulamasının genel bakış sayfasını açın.
Ayarlar grubunda Yapılandırma'yı seçin.
Her ayarı eklemek için Uygulama Ayarları sayfasında Yeni uygulama ayarı düğmesini seçin.
Aşağıdaki ayarları ekleyin:
Ayar adı Değer DBX_INSTANCE Databricks çalışma alanınızın bölgesi. Örnek: westus2.azuredatabricks.net
DBX_PAT Daha önce oluşturduğunuz kişisel erişim belirteci. DBX_JOB_ID Çalışan işin tanımlayıcısı. Bu ayarları işlemek için Kaydet'i seçin.
İşlevler grubunda İşlevler'i ve ardından Oluştur'u seçin.
Azure Event Grid Tetikleyicisi'ni seçin.
İstenirse Microsoft.Azure.WebJobs.Extensions.EventGrid uzantısını yükleyin. Yüklemeniz gerekiyorsa, işlevi oluşturmak için tetikleyiciyi yeniden Azure Event Grid seçmeniz gerekir.
Yeni İşlev bölmesi görüntülenir.
Yeni İşlev bölmesinde işlevi UpsertOrder olarak adlandırın ve oluştur düğmesini seçin.
Kod dosyasının içeriğini bu kodla değiştirin ve kaydet düğmesini seçin :
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Bu kod, oluşturulan depolama olayı hakkındaki bilgileri ayrıştırıp olayı tetikleyen dosyanın URL'sini içeren bir istek iletisi oluşturur. İletinin bir parçası olarak işlev, daha önce oluşturduğunuz source_file pencere öğesine bir değer geçirir. işlev kodu iletiyi Databricks İşi'ne gönderir ve daha önce kimlik doğrulaması olarak aldığınız belirteci kullanır.
Event Grid aboneliği oluşturma
Bu bölümde, dosyalar depolama hesabına yüklendiğinde Azure İşlevini çağıran bir Event Grid aboneliği oluşturacaksınız.
Tümleştirme'yi seçin ve tümleştirme sayfasında Event Grid Tetikleyicisi'ni seçin.
Tetikleyiciyi Düzenle bölmesinde olayı
eventGridEvent
olarak adlandırın ve olay aboneliği oluştur'u seçin.Not
Ad
eventGridEvent
, Azure İşlevi'ne geçirilen adlı parametreyle eşleşir.Olay Aboneliği Oluştur sayfasının Temel Bilgiler sekmesinde aşağıdaki ayarları değiştirin veya doğrulayın:
Ayar Değer Ad contoso-order-event-subscription Konu türü Depolama hesabı Kaynak Kaynak contosoorders Sistem konu adı <create any name>
Olay Türlerine Göre Filtrele Blob Oluşturuldu ve Blob Silindi Oluştur düğmesini seçin.
Event Grid aboneliğini test etme
adlı
customer-order.csv
bir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
Depolama Gezgini'da bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.
Bir dosyanın karşıya yüklenmesi Microsoft.Storage.BlobCreated olayını tetikler. Event Grid, tüm aboneleri bu olaya bildirir. Bizim örneğimizde tek abone Azure İşlevi'dir. Azure İşlevi, hangi olayın gerçekleştiğini belirlemek için olay parametrelerini ayrıştırmaktadır. Daha sonra dosyanın URL'sini Databricks İşi'ne geçirir. Databricks İşi dosyayı okur ve depolama hesabınızın bulunduğu Databricks Delta tablosuna bir satır ekler.
İşin başarılı olup olmadığını denetlemek için işinizin çalıştırmalarını görüntüleyin. Tamamlanma durumunu görürsünüz. bir işin çalıştırmalarını görüntüleme hakkında daha fazla bilgi için bkz. İş için çalıştırmaları görüntüleme
Güncelleştirilmiş delta tablosunu görmek için yeni bir çalışma kitabı hücresinde bu sorguyu bir hücrede çalıştırın.
%sql select * from customer_data
Döndürülen tabloda en son kayıt gösterilir.
Bu kaydı güncelleştirmek için adlı
customer-order-update.csv
bir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
Bu csv dosyası, sipariş miktarının olarak değiştirilmesi
228
22
dışında öncekiyle neredeyse aynıdır.Depolama Gezgini'da bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.
select
Güncelleştirilmiş delta tablosunu görmek için sorguyu yeniden çalıştırın.%sql select * from customer_data
Döndürülen tabloda güncelleştirilmiş kayıt gösterilir.
Kaynakları temizleme
Artık gerekli olmadığında kaynak grubunu ve tüm ilgili kaynakları silin. Bunu yapmak için depolama hesabının kaynak grubunu seçin ve Sil'i seçin.
Sonraki adımlar
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin