Alıştırma - Not Defterini Azure Synapse Pipelines içinde tümleştirme

Tamamlandı

Bu ünitede, eşleme veri akışı tarafından yüklenen verileri analiz etmek ve dönüştürmek ve verileri bir veri gölünde depolamak için bir Azure Synapse Spark not defteri oluşturacaksınız. Not defterinin data lake'e yazdığı verilerin klasör adını tanımlayan bir dize parametresi kabul eden bir parametre hücresi oluşturursunuz.

Ardından bu not defterini bir Synapse işlem hattına ekler ve benzersiz işlem hattı çalıştırma kimliğini not defteri parametresine geçirirsiniz; böylece işlem hattı çalıştırmasını not defteri etkinliği tarafından kaydedilen verilerle ilişkilendirebilirsiniz.

Son olarak, synapse Studio'daki İzleyici hub'ını kullanarak işlem hattı çalıştırmasını izler, çalıştırma kimliğini alır ve ardından veri gölünde depolanan ilgili dosyaları bulursunuz.

Apache Spark ve not defterleri hakkında

Apache Spark, büyük veri analizi uygulamalarının performansını artırmak için bellek içi işlemeyi destekleyen paralel bir işleme çerçevesidir. Azure Synapse Analytics’te Apache Spark, Microsoft'un buluttaki Apache Spark uygulamalarından biridir.

Synapse Studio'da Apache Spark not defteri, canlı kod, görselleştirmeler ve anlatı metni içeren dosyalar oluşturmanız için kullanabileceğiniz bir web arabirimidir. Not defterleri, fikirleri doğrulamak ve hızlı denemeler yaparak verilerinizden içgörüler elde etmek için iyi bir yerdir. Not defterleri veri hazırlama, veri görselleştirme, makine öğrenmesi ve diğer Büyük Veri senaryolarında da yaygın olarak kullanılır.

Synapse Spark not defteri oluşturma

Kullanıcı profili verilerini işlemek, birleştirmek ve içeri aktarmak için Synapse Analytics'te bir eşleme veri akışı oluşturduğunuzu varsayalım. Şimdi, her kullanıcı için hem tercih edilen hem de en iyi seçenek olan ilk beş ürünü bulmak ve son 12 ay içinde en çok satın alma işlemine sahip olmak istiyorsunuz. Ardından, genel olarak ilk beş ürünü hesaplamak istiyorsunuz.

Bu alıştırmada, bu hesaplamaları yapmak için bir Synapse Spark not defteri oluşturacaksınız.

  1. Synapse Analytics Studio' yu ()https://web.azuresynapse.net/ açın ve Veri hub'ına gidin.

    The Data menu item is highlighted.

  2. Bağlı sekmesini (1) seçin ve Azure Data Lake Storage 2. Nesil altındaki birincil data lake storage hesabını (2) genişletin. wwi-02 kapsayıcısını (3) seçin ve top-products klasörünü (4) açın. Herhangi bir Parquet dosyasına (5) sağ tıklayın, Yeni not defteri menü öğesini (6) ve ardından DataFrame'e Yükle (7) öğesini seçin. Klasörü görmüyorsanız öğesini seçin Refresh.

    The Parquet file and new notebook option are highlighted.

  3. Not defterinin Spark havuzunuza bağlı olduğundan emin olun.

    The attach to Spark pool menu item is highlighted.

  4. Klasördeki tüm Parquet dosyalarını top-products seçmek için Parquet dosya adını (1) ile *.parquetdeğiştirin. Örneğin, yol şuna benzer olmalıdır: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    The filename is highlighted.

  5. Not defterini yürütmek için not defteri araç çubuğunda Tümünü çalıştır'ı seçin.

    The cell results are displayed.

    Dekont

    Spark havuzunda ilk kez bir not defteri çalıştırdığınızda Synapse yeni bir oturum oluşturur. Bu işlem yaklaşık 3 - 5 dakika sürebilir.

    Dekont

    Yalnızca hücreyi çalıştırmak için, hücrenin üzerine gelin ve hücrenin solundaki Hücreyi çalıştır simgesini seçin veya hücreyi seçip Ctrl+Enter tuşlarına basın.

  6. Düğmeyi seçip Kod hücre öğesini seçerek +altında yeni bir hücre oluşturun. + düğmesi, soldaki not defteri hücresinin altında bulunur. Alternatif olarak, Not Defteri araç çubuğunda + Hücre menüsünü genişletebilir ve Kod hücre öğesini seçebilirsiniz.

    The Add Code menu option is highlighted.

  7. adlı topPurchasesyeni bir veri çerçevesini doldurmak için yeni hücrede aşağıdaki komutu çalıştırın, adlı top_purchasesyeni bir geçici görünüm oluşturun ve ilk 100 satırı gösterin:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Çıkış aşağıdakine benzer görünmelidir:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. SQL kullanarak yeni bir geçici görünüm oluşturmak için yeni bir hücrede aşağıdaki komutu çalıştırın:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Dekont

    Bu sorgu için çıkış yok.

    Sorgu, kaynak olarak geçici görünümü kullanır top_purchases ve her kullanıcının kayıtlarına en büyük olan ItemsPurchasedLast12Months satır numarasını uygulamak için bir row_number() over yöntem uygular. yan tümcesi where sonuçları filtreler, böylece hem hem de IsTopProductIsPreferredProduct true olarak ayarlanmış en fazla beş ürün elde ederiz. Bu, Azure Cosmos DB'de depolanan kullanıcı profiline göre bu ürünlerin de favori ürünleri olarak tanımlandığı her kullanıcı için en çok satın alınan ilk beş ürünü verir.

  9. Önceki hücrede oluşturduğunuz geçici görünümün sonuçlarını top_5_products depolayan yeni bir DataFrame oluşturmak ve görüntülemek için yeni bir hücrede aşağıdaki komutu çalıştırın:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Kullanıcı başına en çok tercih edilen beş ürünü görüntüleyen aşağıdakine benzer bir çıkış görmeniz gerekir:

    The top five preferred products are displayed per user.

  10. Hem müşteriler tarafından tercih edilen hem de en çok satın alınan ürünleri temel alarak genel olarak ilk beş ürünü hesaplayın. Bunu yapmak için aşağıdaki komutu yeni bir hücrede çalıştırın:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    Bu hücrede, tercih edilen ilk beş ürünü ürün kimliğine göre gruplandırdık, son 12 ay içinde satın alınan toplam öğeleri topladık, bu değeri azalan düzende sıraladık ve ilk beş sonucu döndürdük. Çıkışınızın aşağıdakine benzer olması gerekir:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Parametre hücresi oluşturma

Azure Synapse işlem hatları parametre hücresini arar ve bu hücreyi yürütme zamanında geçirilen parametreler için varsayılan olarak kabul eder. Yürütme altyapısı, varsayılan değerlerin üzerine yazmak için giriş parametreleriyle parametreler hücresinin altına yeni bir hücre ekler. Parametre hücresi belirlenmediğinde, eklenen hücre not defterinin en üstüne eklenir.

  1. Bu not defterini bir işlem hattından yürüteceğiz. Parquet dosyasını adlandırmak için kullanılacak bir değişken değeri ayarlayan bir runId parametre geçirmek istiyoruz. Yeni bir hücrede aşağıdaki komutu çalıştırın:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Rastgele bir GUID oluşturmak için Spark ile birlikte gelen kitaplığı kullanıyoruz uuid . değişkenini runId işlem hattı tarafından geçirilen bir parametreyle geçersiz kılmak istiyoruz. Bunu yapmak için bunu parametre hücresi olarak değiştirmemiz gerekir.

  2. Hücrenin (1) sağ üst köşesindeki eylemler üç nokta (...) simgesini ve ardından Parametre hücresini değiştir (2) seçeneğini belirleyin.

    The menu item is highlighted.

    Bu seçeneği kullandıktan sonra hücrede Parameters etiketini görürsünüz.

    The cell is configured to accept parameters.

  3. Değişkeni birincil data lake hesabındaki yolda Parquet dosya adı /top5-products/ olarak kullanmak runId için aşağıdaki kodu yeni bir hücreye yapıştırın. yolunu birincil data lake hesabınızın adıyla değiştirin YOUR_DATALAKE_NAME . Bunu bulmak için sayfanın en üstündeki Hücre 1'e(1) kaydırın. Data Lake Storage hesabını (2) yolundan kopyalayın. Bu değeri yeni hücrenin içindeki yola (3) yerine YOUR_DATALAKE_NAME yapıştırın ve ardından hücrede komutunu çalıştırın.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    The path is updated with the name of the primary data lake account.

  4. Dosyanın data lake'e yazıldığını doğrulayın. Veri hub'ına gidin ve Bağlı sekmesini (1) seçin. Birincil data lake storage hesabını genişletin ve wwi-02 kapsayıcısını (2) seçin. top5-products klasörüne (3) gidin. Dizinde Parquet dosyası için dosya adı (4) olarak GUID içeren bir klasör görmeniz gerekir.

    The parquet file is highlighted.

    Not Defteri hücresindeki veri çerçevesindeki Parquet yazma yöntemi, daha önce mevcut olmadığından bu dizini oluşturmuştur.

Not Defterini Synapse işlem hattına ekleme

Alıştırmanın başında açıkladığımız Eşleme Veri Akışı geri döndüğünüzde, Veri Akışı düzenleme işleminizin bir parçası olarak çalıştırıldıktan sonra bu not defterini yürütmek istediğinizi varsayalım. Bunu yapmak için, bu not defterini yeni bir Not Defteri etkinliği olarak bir işlem hattına eklersiniz.

  1. Not defterine dönün. Not defterinin sağ üst köşesindeki Özellikler(1) öğesini seçin ve ad (2) için girinCalculate Top 5 Products.

    The properties blade is displayed.

  2. Not defterinin sağ üst köşesindeki İşlem hattına ekle(1) öğesini ve ardından Mevcut işlem hattı (2) seçeneğini belirleyin.

    The add to pipeline button is highlighted.

  3. Kullanıcı Profili Verilerini ASA işlem hattına Yaz (1)'i ve ardından Ekle *(2)'yi seçin.

    The pipeline is selected.

  4. Synapse Studio, Notebook etkinliğini işlem hattına ekler. Not Defteri etkinliğini, Veri akışı etkinliğininsağındaki olacak şekilde yeniden düzenleyin. Veri akışı etkinliğini seçin ve Başarılıetkinliği işlem hattı bağlantısı yeşil kutusunu Not Defteri etkinliğine sürükleyin.

    The green arrow is highlighted.

    Başarı etkinliği oku, veri akışı etkinliği başarıyla çalıştırıldıktan sonra işlem hattına Not Defteri etkinliğini çalıştırmasını bildirir.

  5. Not Defteri etkinliğini (1) seçin, ardından Ayarlar sekmesini (2) seçin, Temel parametreler (3) seçeneğini genişletin ve + Yeni (4) öğesini seçin. Ad alanına (5) girinrunId. Tür (6) için Dize'yi seçin. Değer için Dinamik içerik ekle (7)'yi seçin.

    The settings are displayed.

  6. Sistem değişkenleri (1) altında İşlem hattı çalıştırma kimliği'ne tıklayın. Bu, dinamik içerik kutusuna (2) eklenir@pipeline().RunId. İletişim kutusunu kapatmak için Son 'u (3) seçin.

    The dynamic content form is displayed.

    İşlem hattı çalıştırma kimliği değeri, her işlem hattı çalıştırmasına atanmış benzersiz bir GUID'dir. Bu değeri Not Defteri parametresi olarak geçirerek Parquet dosyasının runId adı için kullanacağız. Ardından işlem hattı çalıştırma geçmişini gözden geçirip her işlem hattı çalıştırması için oluşturulan belirli Parquet dosyasını bulabiliriz.

  7. Değişikliklerinizi kaydetmek için Tümünü yayımla'yı ve ardından Yayımla'yı seçin.

    Publish all is highlighted.

  8. Yayımlama tamamlandıktan sonra Tetikleyici ekle (1)'i ve ardından Güncelleştirilmiş işlem hattını çalıştırmak için Şimdi tetikle'yi (2) seçin.

    The trigger menu item is highlighted.

  9. Tetikleyiciyi çalıştırmak için Tamam'ı seçin.

    The OK button is highlighted.

İşlem hattı çalıştırmasını izleme

İzleyici hub'ı SQL, Apache Spark ve İşlem Hatları için geçerli ve geçmiş etkinlikleri izlemenize olanak tanır.

  1. İzleyici hub'ına gidin.

    The Monitor hub menu item is selected.

  2. İşlem hattı çalıştırmaları (1) öğesini seçin ve işlem hattı çalıştırmasının başarıyla tamamlanmasını (2) bekleyin. Görünümü yenilemeniz (3) gerekebilir.

    The pipeline run succeeded.

  3. İşlem hattının etkinlik çalıştırmalarını görüntülemek için işlem hattının adını seçin.

    The pipeline name is selected.

  4. Hem Veri akışı etkinliğine hem de yeni Not Defteri etkinliğine (1) dikkat edin. İşlem hattı çalıştırma kimliği değerini (2) not edin. Bunu not defteri tarafından oluşturulan Parquet dosya adıyla karşılaştıracağız. Ayrıntılarını görüntülemek için İlk 5 Ürün Not Defterini Hesapla adını seçin (3).

    The pipeline run details are displayed.

  5. Burada Not Defteri çalıştırma ayrıntılarını görüyoruz. İşler (2) aracılığıyla ilerleme durumunun kayıttan yürütülmesini izlemek için Kayıttan Yürütme(1) öğesini seçebilirsiniz. Alt kısımda Tanılama ve Günlükler'i farklı filtre seçenekleriyle görüntüleyebilirsiniz (3). Sağ tarafta süre, Livy Kimliği, Spark havuzu ayrıntıları gibi çalıştırma ayrıntılarını görüntüleyebiliriz. Bir işin ayrıntılarını görüntülemek için Ayrıntıları görüntüle bağlantısını seçin (5).

    The run details are displayed.

  6. Spark uygulaması kullanıcı arabirimi, aşama ayrıntılarını görebildiğimiz yeni bir sekmede açılır. Aşama ayrıntılarını görüntülemek için DAG Görselleştirmesini genişletin.

    The Spark stage details are displayed.

  7. Veri hub'ına geri dönün.

    Data hub.

  8. Bağlı sekmesini (1) seçin, ardından birincil data lake storage hesabında wwi-02 kapsayıcısını (2) seçin, top5-products klasörüne (3) gidin ve Parquet dosyası için adı İşlem Hattı çalıştırma kimliğiyle eşleşen bir klasör olduğunu doğrulayın.

    The file is highlighted.

    Gördüğünüz gibi, adı daha önce not ettiğimiz İşlem Hattı çalıştırma kimliğiyle eşleşen bir dosyamız var:

    The Pipeline run ID is highlighted.

    İşlem Hattı çalıştırma kimliğini Not Defteri etkinliğindeki parametresine geçirtiğimiz için runId bu değerler eşleşmektedir.