Aracılığıyla paylaş


Öğretici: Databricks Delta tablosunu güncelleştirmek için data lake capture desenini uygulama

Bu öğreticide, hiyerarşik ad alanına sahip bir depolama hesabındaki olayların nasıl işleneceğini gösterir.

Kullanıcının satış siparişini açıklayan virgülle ayrılmış değerler (csv) dosyasını karşıya yükleyerek Databricks Delta tablosunu doldurmasını sağlayan küçük bir çözüm oluşturacaksınız. Bir Event Grid aboneliği, bir Azure İşlevi ve Azure Databricks'teki bir İşi birbirine bağlayarak bu çözümü oluşturacaksınız.

Bu öğreticide şunları yapacaksınız:

  • Azure İşlevi çağıran bir Event Grid aboneliği oluşturun.
  • Bir olaydan bildirim alan ve ardından işi Azure Databricks'te çalıştıran bir Azure İşlevi oluşturun.
  • Depolama hesabında bulunan Databricks Delta tablosuna müşteri siparişi ekleyen bir Databricks işi oluşturun.

Azure Databricks çalışma alanından başlayarak bu çözümü ters sırada oluşturacağız.

Önkoşullar

  • Hiyerarşik ad alanına (Azure Data Lake Storage) sahip bir depolama hesabı oluşturun. Bu öğreticide adlı contosoordersbir depolama hesabı kullanılır.

    Bkz. Azure Data Lake Storage ile kullanmak için depolama hesabı oluşturma.

  • Kullanıcı hesabınızın kendisine Atanmış Depolama Blob Verileri Katkıda Bulunanı rolüne sahip olduğundan emin olun.

  • Bir hizmet sorumlusu oluşturun, bir istemci gizli dizisi oluşturun ve ardından hizmet sorumlusuna depolama hesabına erişim verin.

    Bkz . Öğretici: Azure Data Lake Storage'a bağlanma (1- 3. Adım). Bu adımları tamamladıktan sonra kiracı kimliği, uygulama kimliği ve istemci gizli anahtarı değerlerini bir metin dosyasına yapıştırdığınızdan emin olun. Yakında lazım olacak.

  • Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.

Satış siparişi oluştur

İlk olarak, satış siparişini açıklayan bir csv dosyası oluşturun ve ardından bu dosyayı depolama hesabına yükleyin. Daha sonra, Databricks Delta tablomuzdaki ilk satırı doldurmak için bu dosyadaki verileri kullanacaksınız.

  1. Azure portalında yeni depolama hesabınıza gidin.

  2. Depolama tarayıcısı-Blob kapsayıcıları-Kapsayıcı> ekle'yi seçin >

    Depolama tarayıcısında klasör oluşturma işleminin ekran görüntüsü.

  3. Veri kapsayıcısında input adlı bir dizin oluşturun.

  4. Aşağıdaki metni bir metin düzenleyicisine yapıştırın.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Bu dosyayı yerel bilgisayarınıza kaydedin ve data.csv adını verin.

  6. Depolama tarayıcısında bu dosyayı giriş klasörüne yükleyin.

Azure Databricks'te iş oluşturma

Bu bölümde şu görevleri gerçekleştireceksiniz:

  • Azure Databricks çalışma alanı oluşturun.
  • Bir not defteri oluşturun.
  • Databricks Delta tablosu oluşturun ve doldurun.
  • Databricks Delta tablosuna satır ekleyen kod ekleyin.
  • Bir İş oluşturun.

Azure Databricks çalışma alanı oluşturma

Bu bölümde Azure portalını kullanarak bir Azure Databricks çalışma alanı oluşturursunuz.

  1. Azure Databricks çalışma alanı oluşturun. Bu çalışma alanını contoso-ordersadlandır. Bkz. Azure Databricks çalışma alanı oluşturma.

  2. Küme oluşturma. Kümeyi customer-order-clusterolarak adlandırın. Bkz. Küme oluşturma.

  3. Bir not defteri oluşturun. Not defterini adlandırın ve not defterinin configure-customer-table varsayılan dili olarak Python'ı seçin. Bkz . Not defteri oluşturma.

Databricks Delta tablosu oluşturma ve doldurma

  1. Oluşturduğunuz not defterinde, aşağıdaki kod bloğunu kopyalayıp ilk hücreye yapıştırın, ancak bu kodu henüz çalıştırmayın.

    Bu kod bloğundaki appId, password, tenant yer tutucu değerlerini, bu öğreticinin önkoşullarını tamamlarken topladığınız değerlerle değiştirin.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Bu kod, source_file adlı bir pencere öğesi oluşturur. Daha sonra bu kodu çağıran ve bu pencere öğesine bir dosya yolu geçiren bir Azure İşlevi oluşturacaksınız. Bu kod ayrıca depolama hesabıyla hizmet sorumlunuzun kimliğini doğrular ve diğer hücrelerde kullanacağınız bazı değişkenler oluşturur.

    Not

    Üretim ayarında kimlik doğrulama anahtarınızı Azure Databricks'te depolamayı göz önünde bulundurun. Ardından, kimlik doğrulama anahtarı yerine kod bloğunuza bir arama anahtarı ekleyin.

    Örneğin, şu kod spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")satırını kullanmak yerine aşağıdaki kod satırını kullanırsınız: spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Bu öğreticiyi tamamladıktan sonra, bu yaklaşımın örneklerini görmek için Azure Databricks Web Sitesindeki Azure Data Lake Storage makalesine bakın.

  2. Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.

  3. Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın ve ardından SHIFT + ENTER tuşlarına basarak kodu bu blokta çalıştırın.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Bu kod, depolama hesabınızda Databricks Delta tablosunu oluşturur ve daha önce karşıya yüklediğiniz csv dosyasından bazı ilk verileri yükler.

  4. Bu kod bloğu başarıyla çalıştırıldıktan sonra bu kod bloğunu not defterinizden kaldırın.

Databricks Delta tablosuna satır ekleyen kod ekleme

  1. Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın, ancak bu hücreyi çalıştırmayın.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Bu kod, csv dosyasındaki verileri kullanarak verileri geçici bir tablo görünümüne ekler. Bu csv dosyasının yolu, önceki bir adımda oluşturduğunuz giriş pencere öğesinden gelir.

  2. Aşağıdaki kod bloğunu kopyalayıp farklı bir hücreye yapıştırın. Bu kod, geçici tablo görünümünün içeriğini Databricks Delta tablosuyla birleştirir.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

İş Oluşturma

Daha önce oluşturduğunuz not defterini çalıştıran bir İş oluşturun. Daha sonra, bir olay tetiklendiğinde bu işi çalıştıran bir Azure İşlevi oluşturacaksınız.

  1. Yeni-İş'i> seçin.

  2. İşe bir ad verin, oluşturduğunuz ve kümelediğiniz not defterini seçin. Ardından, işi oluşturmak için Oluştur'u seçin.

Azure İşlevi oluşturma

İşi çalıştıran bir Azure İşlevi oluşturun.

  1. Azure Databricks çalışma alanınızda üst çubukta Azure Databricks kullanıcı adınıza tıklayın ve açılan listeden Kullanıcı Ayarları'nı seçin.

  2. Erişim belirteçleri sekmesinde Yeni belirteç oluştur'a tıklayın.

  3. Görüntülenen belirteci kopyalayın ve bitti'ye tıklayın.

  4. Databricks çalışma alanının üst köşesinde kişiler simgesini ve ardından Kullanıcı ayarları'nı seçin.

    Hesabı yönetme

  5. Yeni belirteç oluştur düğmesini ve ardından Oluştur düğmesini seçin.

    Belirteci güvenli bir yere kopyaladığınızdan emin olun. Azure İşlevinizin, İşi çalıştırabilmesi için Databricks ile kimlik doğrulaması yapmak için bu belirtecin olması gerekir.

  6. Azure portalı menüsünde veya Giriş sayfasında Kaynak oluştur'u seçin.

  7. Yeni sayfasında İşlem> seçin.

  8. İşlev Uygulaması Oluştur sayfasının Temel bilgiler sekmesinde bir kaynak grubu seçin ve ardından aşağıdaki ayarları değiştirin veya doğrulayın:

    Ayar Value
    İşlev Uygulamasının adı contosoorder
    Çalışma zamanı yığını .NET
    Yayımlama Kod
    İşletim Sistemi Windows
    Plan türü Kullanım (Sunucusuz)
  9. Gözden geçir ve oluştur'u seçin ve ardından Oluştur seçeneğini belirleyin.

    Dağıtım tamamlandığında Kaynağa git'i seçerek İşlev Uygulamasının genel bakış sayfasını açın.

  10. Ayarlar grubunda Yapılandırma'yı seçin.

  11. Her ayarı eklemek için Uygulama Ayarları sayfasında Yeni uygulama ayarı düğmesini seçin.

    Yapılandırma ayarı ekle

    Aşağıdaki ayarları ekleyin:

    Ayar adı Value
    DBX_INSTANCE Databricks çalışma alanınızın bölgesi. Örneğin: westus2.azuredatabricks.net
    DBX_PAT Daha önce oluşturduğunuz kişisel erişim belirteci.
    DBX_JOB_ID Çalışan işin tanımlayıcısı.
  12. Bu ayarları işlemek için Kaydet'i seçin.

  13. İşlevler grubunda İşlevler'ive ardından Oluştur'u seçin.

  14. Azure Event Grid Tetikleyicisi'ni seçin.

    İstenirse Microsoft.Azure.WebJobs.Extensions.EventGrid uzantısını yükleyin. Yüklemeniz gerekiyorsa işlevi oluşturmak için Azure Event Grid Tetikleyicisi'ni yeniden seçmeniz gerekir.

    Yeni İşlev bölmesi görüntülenir.

  15. Yeni İşlev bölmesinde işlevi UpsertOrder olarak adlandırın ve oluştur düğmesini seçin.

  16. Kod dosyasının içeriğini bu kodla değiştirin ve kaydet düğmesini seçin:

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Bu kod, oluşturulan depolama olayı hakkındaki bilgileri ayrıştırıp olayı tetikleyen dosyanın URL'sini içeren bir istek iletisi oluşturur. İletinin bir parçası olarak işlev, daha önce oluşturduğunuz source_file pencere öğesine bir değer geçirir. İşlev kodu iletiyi Databricks İşi'ne gönderir ve daha önce kimlik doğrulaması olarak aldığınız belirteci kullanır.

Event Grid aboneliği oluşturma

Bu bölümde, dosyalar depolama hesabına yüklendiğinde Azure İşlevini çağıran bir Event Grid aboneliği oluşturacaksınız.

  1. Tümleştirme'yi seçin ve tümleştirme sayfasında Event Grid Tetikleyicisi'ni seçin.

  2. Tetikleyiciyi Düzenle bölmesinde olayı eventGridEventolarak adlandırın ve olay aboneliği oluştur'u seçin.

    Not

    Ad eventGridEvent , Azure İşlevi'ne geçirilen adlı parametreyle eşleşir.

  3. Olay Aboneliği Oluştur sayfasının Temel Bilgiler sekmesinde aşağıdaki ayarları değiştirin veya doğrulayın:

    Ayar Value
    Veri Akışı Adı contoso-order-event-subscription
    Konu türü Depolama hesabı
    Kaynak Kaynak contosoorders
    Sistem konu adı <create any name>
    Olay Türlerine Filtrele Blob Oluşturuldu ve Blob Silindi
  4. Oluştur düğmesini seçin.

Event Grid aboneliğini test etme

  1. adlı customer-order.csvbir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. Depolama Gezgini bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.

    Bir dosyanın karşıya yüklenmesi Microsoft.Storage.BlobCreated olayını tetikler. Event Grid, tüm aboneleri bu olaya bildirir. Bizim örneğimizde tek abone Azure İşlevi'dir. Azure İşlevi, hangi olayın oluştuğuna karar vermek için olay parametrelerini ayrıştırıyor. Ardından dosyanın URL'sini Databricks İşi'ne geçirir. Databricks İşi dosyayı okur ve depolama hesabınızın bulunduğu Databricks Delta tablosuna bir satır ekler.

  3. İşin başarılı olup olmadığını denetlemek için işinizin çalıştırmalarını görüntüleyin. Tamamlanma durumunu görürsünüz. bir işin çalıştırmalarını görüntüleme hakkında daha fazla bilgi için bkz. İş için çalıştırmaları görüntüleme

  4. Güncelleştirilmiş delta tablosunu görmek için yeni bir çalışma kitabı hücresinde bu sorguyu bir hücrede çalıştırın.

    %sql select * from customer_data
    

    Döndürülen tabloda en son kayıt gösterilir.

    En son kayıt tabloda görünüyor

  5. Bu kaydı güncelleştirmek için adlı customer-order-update.csvbir dosya oluşturun, aşağıdaki bilgileri bu dosyaya yapıştırın ve yerel bilgisayarınıza kaydedin.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Bu csv dosyası, sipariş miktarının olarak değiştirilmesi 22822dışında önceki dosyayla neredeyse aynıdır.

  6. Depolama Gezgini bu dosyayı depolama hesabınızın giriş klasörüne yükleyin.

  7. select Güncelleştirilmiş delta tablosunu görmek için sorguyu yeniden çalıştırın.

    %sql select * from customer_data
    

    Döndürülen tabloda güncelleştirilmiş kayıt gösterilir.

    Güncelleştirilmiş kayıt tabloda görünüyor

Kaynakları temizleme

Artık gerekli olmadığında kaynak grubunu ve tüm ilgili kaynakları silin. Bunu yapmak için depolama hesabının kaynak grubunu seçin ve Sil'i seçin.

Sonraki adımlar