Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, hiyerarşik ad alanına sahip bir depolama hesabındaki olayların nasıl işleneceğini gösterir.
Kullanıcının satış siparişini açıklayan virgülle ayrılmış değerler (csv) dosyasını karşıya yükleyerek Databricks Delta tablosunu doldurmasını sağlayan küçük bir çözüm oluşturacaksınız. Bir Event Grid aboneliği, bir Azure İşlevi ve Azure Databricks'teki bir İşi birbirine bağlayarak bu çözümü oluşturacaksınız.
Bu öğreticide şunları yapacaksınız:
- Azure İşlevi çağıran bir Event Grid aboneliği oluşturun.
- Bir olaydan bildirim alan ve ardından işi Azure Databricks'te çalıştıran bir Azure İşlevi oluşturun.
- Depolama hesabında bulunan Databricks Delta tablosuna müşteri siparişi ekleyen bir Databricks işi oluşturun.
Azure Databricks çalışma alanından başlayarak bu çözümü ters sırada oluşturacağız.
Önkoşullar
Hiyerarşik ad alanına (Azure Data Lake Storage) sahip bir depolama hesabı oluşturun. Bu öğreticide adlı
contosoordersbir depolama hesabı kullanılır.Bkz. Azure Data Lake Storage ile kullanmak için depolama hesabı oluşturma.
Kullanıcı hesabınızın kendisine Atanmış Depolama Blob Verileri Katkıda Bulunanı rolüne sahip olduğundan emin olun.
Bir hizmet sorumlusu oluşturun, bir istemci gizli dizisi oluşturun ve ardından hizmet sorumlusuna depolama hesabına erişim verin.
Bkz . Öğretici: Azure Data Lake Storage'a bağlanma (1- 3. Adım). Bu adımları tamamladıktan sonra kiracı kimliği, uygulama kimliği ve istemci gizli anahtarı değerlerini bir metin dosyasına yapıştırdığınızdan emin olun. Yakında lazım olacak.
Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
Satış siparişi oluştur
İlk olarak, satış siparişini açıklayan bir csv dosyası oluşturun ve ardından bu dosyayı depolama hesabına yükleyin. Daha sonra, Databricks Delta tablomuzdaki ilk satırı doldurmak için bu dosyadaki verileri kullanacaksınız.
Azure portalında yeni depolama hesabınıza gidin.
Depolama tarayıcısı-Blob kapsayıcıları-Kapsayıcı> ekle'yi seçin >
Veri kapsayıcısında input adlı bir dizin oluşturun.
Aşağıdaki metni bir metin düzenleyicisine yapıştırın.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United KingdomBu dosyayı yerel bilgisayarınıza kaydedin ve data.csv adını verin.
Depolama tarayıcısında bu dosyayı giriş klasörüne yükleyin.
Azure Databricks'te iş oluşturma
Bu bölümde şu görevleri gerçekleştireceksiniz:
- Azure Databricks çalışma alanı oluşturun.
- Bir not defteri oluşturun.
- Databricks Delta tablosu oluşturun ve doldurun.
- Databricks Delta tablosuna satır ekleyen kod ekleyin.
- Bir İş oluşturun.
Azure Databricks çalışma alanı oluşturma
Bu bölümde Azure portalını kullanarak bir Azure Databricks çalışma alanı oluşturursunuz.
Azure Databricks çalışma alanı oluşturun. Bu çalışma alanını
contoso-ordersadlandır. Bkz. Azure Databricks çalışma alanı oluşturma.Küme oluşturma. Kümeyi
customer-order-clusterolarak adlandırın. Bkz. Küme oluşturma.Bir not defteri oluşturun. Not defterini adlandırın ve not defterinin
configure-customer-tablevarsayılan dili olarak Python'ı seçin. Bkz . Not defteri oluşturma.
Databricks Delta tablosu oluşturma ve doldurma
Oluşturduğunuz not defterinde, aşağıdaki kod bloğunu kopyalayıp ilk hücreye yapıştırın, ancak bu kodu henüz çalıştırmayın.
Bu kod bloğundaki
appId,password,tenantyer tutucu değerlerini, bu öğreticinin önkoşullarını tamamlarken topladığınız değerlerle değiştirin.dbutils.widgets.text('source_file', "", "Source File") spark.conf.set("fs.azure.account.auth.type", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>") spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token") adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/' inputPath = adlsPath + dbutils.widgets.get('source_file') customerTablePath = adlsPath + 'delta-tables/customers'Bu kod, source_file adlı bir pencere öğesi oluşturur. Daha sonra bu kodu çağıran ve bu pencere öğesine bir dosya yolu geçiren bir Azure İşlevi oluşturacaksınız. Bu kod ayrıca depolama hesabıyla hizmet sorumlunuzun kimliğini doğrular ve diğer hücrelerde kullanacağınız bazı değişkenler oluşturur.
Not
Üretim ayarında kimlik doğrulama anahtarınızı Azure Databricks'te depolamayı göz önünde bulundurun. Ardından, kimlik doğrulama anahtarı yerine kod bloğunuza bir arama anahtarı ekleyin.
Örneğin, şu kodspark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")satırını kullanmak yerine aşağıdaki kod satırını kullanırsınız:spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).
Bu öğreticiyi tamamladıktan sonra, bu yaklaşımın örneklerini görmek için Azure Databricks Web Sitesindeki Azure Data Lake Storage makalesine bakın.Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın ve ardından SHIFT + ENTER tuşlarına basarak kodu bu blokta çalıştırın.
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType inputSchema = StructType([ StructField("InvoiceNo", IntegerType(), True), StructField("StockCode", StringType(), True), StructField("Description", StringType(), True), StructField("Quantity", IntegerType(), True), StructField("InvoiceDate", StringType(), True), StructField("UnitPrice", DoubleType(), True), StructField("CustomerID", IntegerType(), True), StructField("Country", StringType(), True) ]) rawDataDF = (spark.read .option("header", "true") .schema(inputSchema) .csv(adlsPath + 'input') ) (rawDataDF.write .mode("overwrite") .format("delta") .saveAsTable("customer_data", path=customerTablePath))Bu kod, depolama hesabınızda Databricks Delta tablosunu oluşturur ve daha önce karşıya yüklediğiniz csv dosyasından bazı ilk verileri yükler.
Bu kod bloğu başarıyla çalıştırıldıktan sonra bu kod bloğunu not defterinizden kaldırın.
Databricks Delta tablosuna satır ekleyen kod ekleme
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın, ancak bu hücreyi çalıştırmayın.
upsertDataDF = (spark .read .option("header", "true") .csv(inputPath) ) upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")Bu kod, csv dosyasındaki verileri kullanarak verileri geçici bir tablo görünümüne ekler. Bu csv dosyasının yolu, önceki bir adımda oluşturduğunuz giriş pencere öğesinden gelir.
Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın. Bu kod, geçici tablo görünümünün içeriğini Databricks Delta tablosuyla birleştirir.
%sql MERGE INTO customer_data cd USING customer_data_to_upsert cu ON cd.CustomerID = cu.CustomerID WHEN MATCHED THEN UPDATE SET cd.StockCode = cu.StockCode, cd.Description = cu.Description, cd.InvoiceNo = cu.InvoiceNo, cd.Quantity = cu.Quantity, cd.InvoiceDate = cu.InvoiceDate, cd.UnitPrice = cu.UnitPrice, cd.Country = cu.Country WHEN NOT MATCHED THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country) VALUES ( cu.InvoiceNo, cu.StockCode, cu.Description, cu.Quantity, cu.InvoiceDate, cu.UnitPrice, cu.CustomerID, cu.Country)
İş Oluşturma
Daha önce oluşturduğunuz not defterini çalıştıran bir İş oluşturun. Daha sonra, bir olay tetiklendiğinde bu işi çalıştıran bir Azure İşlevi oluşturacaksınız.
Yeni-İş'i> seçin.
İşe bir ad verin, oluşturduğunuz ve kümelediğiniz not defterini seçin. Ardından, işi oluşturmak için Oluştur'u seçin.
Azure İşlevi oluşturma
İşi çalıştıran bir Azure İşlevi oluşturun.
Azure Databricks çalışma alanınızda üst çubukta Azure Databricks kullanıcı adınıza tıklayın ve açılan listeden Kullanıcı Ayarları'nı seçin.
Erişim belirteçleri sekmesinde Yeni belirteç oluştur'a tıklayın.
Görüntülenen belirteci kopyalayın ve bitti'ye tıklayın.
Databricks çalışma alanının üst köşesinde kişiler simgesini ve ardından Kullanıcı ayarları'nı seçin.
Yeni belirteç oluştur düğmesini ve ardından Oluştur düğmesini seçin.
Belirteci güvenli bir yere kopyaladığınızdan emin olun. Azure İşlevinizin, İşi çalıştırabilmesi için Databricks ile kimlik doğrulaması yapmak için bu belirtecin olması gerekir.
Azure portalı menüsünde veya Giriş sayfasında Kaynak oluştur'u seçin.
Yeni sayfasında İşlem> seçin.
İşlev Uygulaması Oluştur sayfasının Temel bilgiler sekmesinde bir kaynak grubu seçin ve ardından aşağıdaki ayarları değiştirin veya doğrulayın:
Ayar Value İşlev Uygulamasının adı contosoorder Çalışma zamanı yığını .NET Yayımlama Kod İşletim Sistemi Windows Plan türü Kullanım (Sunucusuz) Gözden geçir ve oluştur'u seçin ve ardından Oluştur seçeneğini belirleyin.
Dağıtım tamamlandığında Kaynağa git'i seçerek İşlev Uygulamasının genel bakış sayfasını açın.
Ayarlar grubunda Yapılandırma'yı seçin.
Her ayarı eklemek için Uygulama Ayarları sayfasında Yeni uygulama ayarı düğmesini seçin.
Aşağıdaki ayarları ekleyin:
Ayar adı Value DBX_INSTANCE Databricks çalışma alanınızın bölgesi. Örneğin: westus2.azuredatabricks.netDBX_PAT Daha önce oluşturduğunuz kişisel erişim belirteci. DBX_JOB_ID Çalışan işin tanımlayıcısı. Bu ayarları işlemek için Kaydet'i seçin.
İşlevler grubunda İşlevler'ive ardından Oluştur'u seçin.
Azure Event Grid Tetikleyicisi'ni seçin.
İstenirse Microsoft.Azure.WebJobs.Extensions.EventGrid uzantısını yükleyin. Yüklemeniz gerekiyorsa işlevi oluşturmak için Azure Event Grid Tetikleyicisi'ni yeniden seçmeniz gerekir.
Yeni İşlev bölmesi görüntülenir.
Yeni İşlev bölmesinde işlevi UpsertOrder olarak adlandırın ve oluştur düğmesini seçin.
Kod dosyasının içeriğini bu kodla değiştirin ve kaydet düğmesini seçin:
#r "Azure.Messaging.EventGrid" #r "System.Memory.Data" #r "Newtonsoft.Json" #r "System.Text.Json" using Azure.Messaging.EventGrid; using Azure.Messaging.EventGrid.SystemEvents; using Newtonsoft.Json; using Newtonsoft.Json.Linq; private static HttpClient httpClient = new HttpClient(); public static async Task Run(EventGridEvent eventGridEvent, ILogger log) { log.LogInformation("Event Subject: " + eventGridEvent.Subject); log.LogInformation("Event Topic: " + eventGridEvent.Topic); log.LogInformation("Event Type: " + eventGridEvent.EventType); log.LogInformation(eventGridEvent.Data.ToString()); if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") { StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>(); if (fileData.Api == "FlushWithClose") { log.LogInformation("Triggering Databricks Job for file: " + fileData.Url); var fileUrl = new Uri(fileData.Url); var httpRequestMessage = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))), Headers = { { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)}, { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" } }, Content = new StringContent(JsonConvert.SerializeObject(new { job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process), notebook_params = new { source_file = String.Join("", fileUrl.Segments.Skip(2)) } })) }; var response = await httpClient.SendAsync(httpRequestMessage); response.EnsureSuccessStatusCode(); } } }
Bu kod, oluşturulan depolama olayı hakkındaki bilgileri ayrıştırıp olayı tetikleyen dosyanın URL'sini içeren bir istek iletisi oluşturur. İletinin bir parçası olarak işlev, daha önce oluşturduğunuz source_file pencere öğesine bir değer geçirir. İşlev kodu iletiyi Databricks İşi'ne gönderir ve daha önce kimlik doğrulaması olarak aldığınız belirteci kullanır.
Event Grid aboneliği oluşturma
Bu bölümde, dosyalar depolama hesabına yüklendiğinde Azure İşlevini çağıran bir Event Grid aboneliği oluşturacaksınız.
Tümleştirme'yi seçin ve tümleştirme sayfasında Event Grid Tetikleyicisi'ni seçin.
Tetikleyiciyi Düzenle bölmesinde olayı
eventGridEventolarak adlandırın ve olay aboneliği oluştur'u seçin.Not
Ad
eventGridEvent, Azure İşlevi'ne geçirilen adlı parametreyle eşleşir.Olay Aboneliği Oluştur sayfasının Temel Bilgiler sekmesinde aşağıdaki ayarları değiştirin veya doğrulayın:
Ayar Value Veri Akışı Adı contoso-order-event-subscription Konu türü Depolama hesabı Kaynak Kaynak contosoorders Sistem konu adı <create any name>Olay Türlerine Filtrele Blob Oluşturuldu ve Blob Silindi Oluştur düğmesini seçin.
Event Grid aboneliğini test etme
adlı
customer-order.csvbir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra LeoneDepolama Gezgini bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.
Bir dosyanın karşıya yüklenmesi Microsoft.Storage.BlobCreated olayını tetikler. Event Grid, tüm aboneleri bu olaya bildirir. Bizim örneğimizde tek abone Azure İşlevi'dir. Azure İşlevi, hangi olayın oluştuğuna karar vermek için olay parametrelerini ayrıştırıyor. Ardından dosyanın URL'sini Databricks İşi'ne geçirir. Databricks İşi dosyayı okur ve depolama hesabınızın bulunduğu Databricks Delta tablosuna bir satır ekler.
İşin başarılı olup olmadığını denetlemek için işinizin çalıştırmalarını görüntüleyin. Tamamlanma durumunu görürsünüz. bir işin çalıştırmalarını görüntüleme hakkında daha fazla bilgi için bkz. İş için çalıştırmaları görüntüleme
Güncelleştirilmiş delta tablosunu görmek için yeni bir çalışma kitabı hücresinde bu sorguyu bir hücrede çalıştırın.
%sql select * from customer_dataDöndürülen tabloda en son kayıt gösterilir.
Bu kaydı güncelleştirmek için adlı
customer-order-update.csvbir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country 536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra LeoneBu csv dosyası, sipariş miktarının olarak değiştirilmesi
22822dışında önceki dosyayla neredeyse aynıdır.Depolama Gezgini bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.
selectGüncelleştirilmiş delta tablosunu görmek için sorguyu yeniden çalıştırın.%sql select * from customer_dataDöndürülen tabloda güncelleştirilmiş kayıt gösterilir.
Kaynakları temizleme
Artık gerekli olmadığında kaynak grubunu ve tüm ilgili kaynakları silin. Bunu yapmak için depolama hesabının kaynak grubunu seçin ve Sil'i seçin.