Alıştırma - Not Defterini Azure Synapse Pipelines içinde tümleştirme
Bu ünitede, eşleme veri akışı tarafından yüklenen verileri analiz etmek ve dönüştürmek ve verileri bir veri gölünde depolamak için bir Azure Synapse Spark not defteri oluşturacaksınız. Not defterinin data lake'e yazdığı verilerin klasör adını tanımlayan bir dize parametresi kabul eden bir parametre hücresi oluşturursunuz.
Ardından bu not defterini bir Synapse işlem hattına ekler ve benzersiz işlem hattı çalıştırma kimliğini not defteri parametresine geçirirsiniz; böylece işlem hattı çalıştırmasını not defteri etkinliği tarafından kaydedilen verilerle ilişkilendirebilirsiniz.
Son olarak, synapse Studio'daki İzleyici hub'ını kullanarak işlem hattı çalıştırmasını izler, çalıştırma kimliğini alır ve ardından veri gölünde depolanan ilgili dosyaları bulursunuz.
Apache Spark ve not defterleri hakkında
Apache Spark, büyük veri analizi uygulamalarının performansını artırmak için bellek içi işlemeyi destekleyen paralel bir işleme çerçevesidir. Azure Synapse Analytics’te Apache Spark, Microsoft'un buluttaki Apache Spark uygulamalarından biridir.
Synapse Studio'da Apache Spark not defteri, canlı kod, görselleştirmeler ve anlatı metni içeren dosyalar oluşturmanız için kullanabileceğiniz bir web arabirimidir. Not defterleri, fikirleri doğrulamak ve hızlı denemeler yaparak verilerinizden içgörüler elde etmek için iyi bir yerdir. Not defterleri veri hazırlama, veri görselleştirme, makine öğrenmesi ve diğer Büyük Veri senaryolarında da yaygın olarak kullanılır.
Synapse Spark not defteri oluşturma
Kullanıcı profili verilerini işlemek, birleştirmek ve içeri aktarmak için Synapse Analytics'te bir eşleme veri akışı oluşturduğunuzu varsayalım. Şimdi, her kullanıcı için hem tercih edilen hem de en iyi seçenek olan ilk beş ürünü bulmak ve son 12 ay içinde en çok satın alma işlemine sahip olmak istiyorsunuz. Ardından, genel olarak ilk beş ürünü hesaplamak istiyorsunuz.
Bu alıştırmada, bu hesaplamaları yapmak için bir Synapse Spark not defteri oluşturacaksınız.
Synapse Analytics Studio' yu ()https://web.azuresynapse.net/ açın ve Veri hub'ına gidin.
Bağlı sekmesini (1) seçin ve Azure Data Lake Storage 2. Nesil altındaki birincil data lake storage hesabını (2) genişletin. wwi-02 kapsayıcısını (3) seçin ve top-products klasörünü (4) açın. Herhangi bir Parquet dosyasına (5) sağ tıklayın, Yeni not defteri menü öğesini (6) ve ardından DataFrame'e Yükle (7) öğesini seçin. Klasörü görmüyorsanız öğesini seçin
Refresh
.Not defterinin Spark havuzunuza bağlı olduğundan emin olun.
Klasördeki tüm Parquet dosyalarını
top-products
seçmek için Parquet dosya adını (1) ile*.parquet
değiştirin. Örneğin, yol şuna benzer olmalıdır:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Not defterini yürütmek için not defteri araç çubuğunda Tümünü çalıştır'ı seçin.
Dekont
Spark havuzunda ilk kez bir not defteri çalıştırdığınızda Synapse yeni bir oturum oluşturur. Bu işlem yaklaşık 3 - 5 dakika sürebilir.
Dekont
Yalnızca hücreyi çalıştırmak için, hücrenin üzerine gelin ve hücrenin solundaki Hücreyi çalıştır simgesini seçin veya hücreyi seçip Ctrl+Enter tuşlarına basın.
Düğmeyi seçip Kod hücre öğesini seçerek +altında yeni bir hücre oluşturun. + düğmesi, soldaki not defteri hücresinin altında bulunur. Alternatif olarak, Not Defteri araç çubuğunda + Hücre menüsünü genişletebilir ve Kod hücre öğesini seçebilirsiniz.
adlı
topPurchases
yeni bir veri çerçevesini doldurmak için yeni hücrede aşağıdaki komutu çalıştırın, adlıtop_purchases
yeni bir geçici görünüm oluşturun ve ilk 100 satırı gösterin:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Çıkış aşağıdakine benzer görünmelidir:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
SQL kullanarak yeni bir geçici görünüm oluşturmak için yeni bir hücrede aşağıdaki komutu çalıştırın:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Dekont
Bu sorgu için çıkış yok.
Sorgu, kaynak olarak geçici görünümü kullanır
top_purchases
ve her kullanıcının kayıtlarına en büyük olanItemsPurchasedLast12Months
satır numarasını uygulamak için birrow_number() over
yöntem uygular. yan tümcesiwhere
sonuçları filtreler, böylece hem hem deIsTopProduct
IsPreferredProduct
true olarak ayarlanmış en fazla beş ürün elde ederiz. Bu, Azure Cosmos DB'de depolanan kullanıcı profiline göre bu ürünlerin de favori ürünleri olarak tanımlandığı her kullanıcı için en çok satın alınan ilk beş ürünü verir.Önceki hücrede oluşturduğunuz geçici görünümün sonuçlarını
top_5_products
depolayan yeni bir DataFrame oluşturmak ve görüntülemek için yeni bir hücrede aşağıdaki komutu çalıştırın:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Kullanıcı başına en çok tercih edilen beş ürünü görüntüleyen aşağıdakine benzer bir çıkış görmeniz gerekir:
Hem müşteriler tarafından tercih edilen hem de en çok satın alınan ürünleri temel alarak genel olarak ilk beş ürünü hesaplayın. Bunu yapmak için aşağıdaki komutu yeni bir hücrede çalıştırın:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
Bu hücrede, tercih edilen ilk beş ürünü ürün kimliğine göre gruplandırdık, son 12 ay içinde satın alınan toplam öğeleri topladık, bu değeri azalan düzende sıraladık ve ilk beş sonucu döndürdük. Çıkışınızın aşağıdakine benzer olması gerekir:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Parametre hücresi oluşturma
Azure Synapse işlem hatları parametre hücresini arar ve bu hücreyi yürütme zamanında geçirilen parametreler için varsayılan olarak kabul eder. Yürütme altyapısı, varsayılan değerlerin üzerine yazmak için giriş parametreleriyle parametreler hücresinin altına yeni bir hücre ekler. Parametre hücresi belirlenmediğinde, eklenen hücre not defterinin en üstüne eklenir.
Bu not defterini bir işlem hattından yürüteceğiz. Parquet dosyasını adlandırmak için kullanılacak bir değişken değeri ayarlayan bir
runId
parametre geçirmek istiyoruz. Yeni bir hücrede aşağıdaki komutu çalıştırın:import uuid # Generate random GUID runId = uuid.uuid4()
Rastgele bir GUID oluşturmak için Spark ile birlikte gelen kitaplığı kullanıyoruz
uuid
. değişkeninirunId
işlem hattı tarafından geçirilen bir parametreyle geçersiz kılmak istiyoruz. Bunu yapmak için bunu parametre hücresi olarak değiştirmemiz gerekir.Hücrenin (1) sağ üst köşesindeki eylemler üç nokta (...) simgesini ve ardından Parametre hücresini değiştir (2) seçeneğini belirleyin.
Bu seçeneği kullandıktan sonra hücrede Parameters etiketini görürsünüz.
Değişkeni birincil data lake hesabındaki yolda Parquet dosya adı
/top5-products/
olarak kullanmakrunId
için aşağıdaki kodu yeni bir hücreye yapıştırın. yolunu birincil data lake hesabınızın adıyla değiştirinYOUR_DATALAKE_NAME
. Bunu bulmak için sayfanın en üstündeki Hücre 1'e(1) kaydırın. Data Lake Storage hesabını (2) yolundan kopyalayın. Bu değeri yeni hücrenin içindeki yola (3) yerineYOUR_DATALAKE_NAME
yapıştırın ve ardından hücrede komutunu çalıştırın.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Dosyanın data lake'e yazıldığını doğrulayın. Veri hub'ına gidin ve Bağlı sekmesini (1) seçin. Birincil data lake storage hesabını genişletin ve wwi-02 kapsayıcısını (2) seçin. top5-products klasörüne (3) gidin. Dizinde Parquet dosyası için dosya adı (4) olarak GUID içeren bir klasör görmeniz gerekir.
Not Defteri hücresindeki veri çerçevesindeki Parquet yazma yöntemi, daha önce mevcut olmadığından bu dizini oluşturmuştur.
Not Defterini Synapse işlem hattına ekleme
Alıştırmanın başında açıkladığımız Eşleme Veri Akışı geri döndüğünüzde, Veri Akışı düzenleme işleminizin bir parçası olarak çalıştırıldıktan sonra bu not defterini yürütmek istediğinizi varsayalım. Bunu yapmak için, bu not defterini yeni bir Not Defteri etkinliği olarak bir işlem hattına eklersiniz.
Not defterine dönün. Not defterinin sağ üst köşesindeki Özellikler(1) öğesini seçin ve ad (2) için girin
Calculate Top 5 Products
.Not defterinin sağ üst köşesindeki İşlem hattına ekle(1) öğesini ve ardından Mevcut işlem hattı (2) seçeneğini belirleyin.
Kullanıcı Profili Verilerini ASA işlem hattına Yaz (1)'i ve ardından Ekle *(2)'yi seçin.
Synapse Studio, Notebook etkinliğini işlem hattına ekler. Not Defteri etkinliğini, Veri akışı etkinliğininsağındaki olacak şekilde yeniden düzenleyin. Veri akışı etkinliğini seçin ve Başarılıetkinliği işlem hattı bağlantısı yeşil kutusunu Not Defteri etkinliğine sürükleyin.
Başarı etkinliği oku, veri akışı etkinliği başarıyla çalıştırıldıktan sonra işlem hattına Not Defteri etkinliğini çalıştırmasını bildirir.
Not Defteri etkinliğini (1) seçin, ardından Ayarlar sekmesini (2) seçin, Temel parametreler (3) seçeneğini genişletin ve + Yeni (4) öğesini seçin. Ad alanına (5) girin
runId
. Tür (6) için Dize'yi seçin. Değer için Dinamik içerik ekle (7)'yi seçin.Sistem değişkenleri (1) altında İşlem hattı çalıştırma kimliği'ne tıklayın. Bu, dinamik içerik kutusuna (2) eklenir
@pipeline().RunId
. İletişim kutusunu kapatmak için Son 'u (3) seçin.İşlem hattı çalıştırma kimliği değeri, her işlem hattı çalıştırmasına atanmış benzersiz bir GUID'dir. Bu değeri Not Defteri parametresi olarak geçirerek Parquet dosyasının
runId
adı için kullanacağız. Ardından işlem hattı çalıştırma geçmişini gözden geçirip her işlem hattı çalıştırması için oluşturulan belirli Parquet dosyasını bulabiliriz.Değişikliklerinizi kaydetmek için Tümünü yayımla'yı ve ardından Yayımla'yı seçin.
Yayımlama tamamlandıktan sonra Tetikleyici ekle (1)'i ve ardından Güncelleştirilmiş işlem hattını çalıştırmak için Şimdi tetikle'yi (2) seçin.
Tetikleyiciyi çalıştırmak için Tamam'ı seçin.
İşlem hattı çalıştırmasını izleme
İzleyici hub'ı SQL, Apache Spark ve İşlem Hatları için geçerli ve geçmiş etkinlikleri izlemenize olanak tanır.
İzleyici hub'ına gidin.
İşlem hattı çalıştırmaları (1) öğesini seçin ve işlem hattı çalıştırmasının başarıyla tamamlanmasını (2) bekleyin. Görünümü yenilemeniz (3) gerekebilir.
İşlem hattının etkinlik çalıştırmalarını görüntülemek için işlem hattının adını seçin.
Hem Veri akışı etkinliğine hem de yeni Not Defteri etkinliğine (1) dikkat edin. İşlem hattı çalıştırma kimliği değerini (2) not edin. Bunu not defteri tarafından oluşturulan Parquet dosya adıyla karşılaştıracağız. Ayrıntılarını görüntülemek için İlk 5 Ürün Not Defterini Hesapla adını seçin (3).
Burada Not Defteri çalıştırma ayrıntılarını görüyoruz. İşler (2) aracılığıyla ilerleme durumunun kayıttan yürütülmesini izlemek için Kayıttan Yürütme(1) öğesini seçebilirsiniz. Alt kısımda Tanılama ve Günlükler'i farklı filtre seçenekleriyle görüntüleyebilirsiniz (3). Sağ tarafta süre, Livy Kimliği, Spark havuzu ayrıntıları gibi çalıştırma ayrıntılarını görüntüleyebiliriz. Bir işin ayrıntılarını görüntülemek için Ayrıntıları görüntüle bağlantısını seçin (5).
Spark uygulaması kullanıcı arabirimi, aşama ayrıntılarını görebildiğimiz yeni bir sekmede açılır. Aşama ayrıntılarını görüntülemek için DAG Görselleştirmesini genişletin.
Veri hub'ına geri dönün.
Bağlı sekmesini (1) seçin, ardından birincil data lake storage hesabında wwi-02 kapsayıcısını (2) seçin, top5-products klasörüne (3) gidin ve Parquet dosyası için adı İşlem Hattı çalıştırma kimliğiyle eşleşen bir klasör olduğunu doğrulayın.
Gördüğünüz gibi, adı daha önce not ettiğimiz İşlem Hattı çalıştırma kimliğiyle eşleşen bir dosyamız var:
İşlem Hattı çalıştırma kimliğini Not Defteri etkinliğindeki parametresine geçirtiğimiz için
runId
bu değerler eşleşmektedir.
Yardıma mı ihtiyacınız var? Sorun giderme kılavuzumuza gözatın veya sorun bildirerek belirli bir konuda geri bildiriminizi paylaşın.