Veri akışı betiği (DFS)

UYGULANANLAR: Azure Data Factory Azure Synapse Analytics

Bahşiş

Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!

Veri akışları hem Azure Data Factory'de hem de Azure Synapse Pipelines'da kullanılabilir. Bu makale, eşleme veri akışları için geçerlidir. Dönüştürmeler hakkında yeniyseniz lütfen eşleme veri akışı kullanarak verileri dönüştürme başlıklı giriş makalesine bakın.

Veri akışı betiği (DFS), eşleme veri akışına dahil edilen dönüştürmeleri yürütmek için kullanılan kodlama diline benzer temel meta verilerdir. Her dönüştürme, işi düzgün çalıştırmak için gerekli bilgileri sağlayan bir dizi özellikle temsil edilir. Betik, tarayıcı kullanıcı arabiriminin üst şeridindeki "betik" düğmesine tıklayarak ADF'den görülebilir ve düzenlenebilir.

Script button

Örneğin, allowSchemaDrift: true, bir kaynak dönüştürmede hizmete, şema projeksiyonunda yer almasalar bile kaynak veri kümesindeki tüm sütunları veri akışına dahil etmelerini söyler.

Kullanım örnekleri

DFS, kullanıcı arabirimi tarafından otomatik olarak oluşturulur. Betiği görüntülemek ve özelleştirmek için Betik düğmesine tıklayabilirsiniz. Ayrıca ADF kullanıcı arabirimi dışında betikler oluşturabilir ve bunu PowerShell cmdlet'ine geçirebilirsiniz. Karmaşık veri akışlarında hata ayıklarken, akışlarınızın ui graph gösterimini taramak yerine arka planda betik kodunu taramayı daha kolay bulabilirsiniz.

Aşağıda birkaç örnek kullanım örneği verilmiştir:

  • Programsal olarak oldukça benzer birçok veri akışı üretir, yani "damgalama" veri akışları.
  • Kullanıcı arabiriminde yönetilmesi zor olan veya doğrulama sorunlarına neden olan karmaşık ifadeler.
  • Hata ayıklama ve yürütme sırasında döndürülen çeşitli hataları daha iyi anlama.

PowerShell veya API ile kullanmak üzere bir veri akışı betiği oluşturduğunuzda, biçimlendirilmiş metni tek bir satıra daraltmalısınız. Sekmeleri ve yeni çizgileri kaçış karakterleri olarak tutabilirsiniz. Ancak metin bir JSON özelliğine sığacak şekilde biçimlendirilmelidir. Alt kısımdaki betik düzenleyicisi kullanıcı arabiriminde betiği sizin için tek satır olarak biçimlendiren bir düğme vardır.

Copy button

Dönüşüm ekleme

Dönüştürmeleri eklemek için üç temel adım gerekir: çekirdek dönüştürme verilerini ekleme, giriş akışını yeniden yönlendirme ve ardından çıkış akışını yeniden yönlendirme. Bu, örnekte en kolay şekilde görülebilir. Veri akışını aşağıdaki gibi havuza almak için basit bir kaynakla başladığımızı düşünelim:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Türetilmiş bir dönüştürme eklemeye karar verirsek, önce adlı upperCaseTitleyeni bir büyük harf sütunu eklemek için basit bir ifade içeren çekirdek dönüştürme metnini oluşturmamız gerekir:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Ardından mevcut DFS'yi alıp dönüştürmeyi ekleyeceğiz:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Şimdi de yeni dönüşümün hangi dönüşümden sonra gelmesini istediğimizi belirleyerek (bu örnekte) source1gelen akışı yeniden yönlendiriyoruz ve akışın adını yeni dönüşüme kopyalayacağız:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Son olarak, bu yeni dönüşümden sonra gelmesini istediğimiz dönüşümü tanımlıyoruz ve giriş akışını (bu örnekte, sink1) yeni dönüşümümüzün çıkış akışı adıyla değiştiriyoruz:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

DFS temelleri

DFS, kaynaklar, havuzlar ve yeni sütunlar ekleyebilecek, verileri filtreleyebilecek, verileri birleştirebilecek ve çok daha fazlasını yapabilecek çeşitli bağlantılı dönüştürmelerden oluşur. Betik genellikle bir veya daha fazla kaynakla başlar ve ardından birçok dönüştürme olur ve bir veya daha fazla havuzla biter.

Kaynakların tümü aynı temel yapıya sahiptir:

source(
  source properties
) ~> source_name

Örneğin, üç sütunlu basit bir kaynak (movieId, title, genres) şöyle olabilir:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Kaynaklar dışındaki tüm dönüşümler aynı temel yapıya sahiptir:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Örneğin, bir sütun (başlık) alan ve büyük harfli bir sürümle üzerine yazan basit bir türet dönüştürme aşağıdaki gibi olacaktır:

source1 derive(
  title = upper(title)
) ~> derive1

Ve şeması olmayan bir havuz şöyle olur:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Betik parçacıkları

Betik kod parçacıkları, veri akışları arasında paylaşmak için kullanabileceğiniz Veri Akışı Betiğin paylaşılabilir kodudur. Aşağıdaki videoda betik kod parçacıklarının nasıl kullanılacağı ve betiğin bölümlerini veri akışı graflarınızın arkasına kopyalayıp yapıştırmak için Veri Akışı Betiği'nin nasıl kullanılacağı açıklanır:

Toplu özet istatistikleri

Veri akışınıza "SummaryStats" adlı bir Toplama dönüşümü ekleyin ve ardından betiğinizdeki toplama işlevi için aşağıdaki kodu yapıştırın ve mevcut SummaryStats değerini değiştirin. Bu, veri profili özet istatistikleri için genel bir desen sağlar.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Verilerinizdeki benzersiz satırların sayısını ve benzersiz satır sayısını saymak için aşağıdaki örneği de kullanabilirsiniz. Aşağıdaki örnek, ValueDistAgg adlı Toplama dönüştürmesi ile bir veri akışına yapıştırılabilir. Bu örnekte "title" adlı bir sütun kullanılır. "title" değerini, verinizdeki değer sayılarını almak için kullanmak istediğiniz dize sütunuyla değiştirdiğinizden emin olun.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Tüm sütunları toplamaya ekleme

Bu, toplamalar oluştururken çıktı meta verilerinizde kalan sütunları nasıl tutabileceğinizi gösteren genel bir toplama desenidir. Bu durumda, adı "film" olmayan her sütundaki ilk değeri seçmek için işlevini kullanırız first() . Bunu kullanmak için DistinctRows adlı bir Toplama dönüşümü oluşturun ve bunu mevcut DistinctRows toplama betiğinin üzerine betiğinize yapıştırın.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Satır karması parmak izi oluşturma

Üç sütunun karmasını oluşturan sha1 adlı DWhash yeni bir türetilmiş sütun oluşturmak için veri akışı betiğinizde bu kodu kullanın.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Aşağıdaki betiği, her sütunu adlandırmanıza gerek kalmadan akışınızda bulunan tüm sütunları kullanarak bir satır karması oluşturmak için de kullanabilirsiniz:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg eşdeğeri

Bu kod, T-SQL string_agg() işlevi gibi davranır ve dize değerlerini bir dizide toplar. Ardından bu diziyi SQL hedefleriyle kullanmak üzere bir dizeye dönüştürebilirsiniz.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Güncelleştirmelerin, upsert'lerin, eklemelerin, silmelerin sayısını sayma

Satır Değiştir dönüştürmesini kullanırken, Satır Değiştir ilkelerinizden kaynaklanan güncelleştirme, upsert, insert, delete sayısını saymak isteyebilirsiniz. Değiştirme satırınızdan sonra bir Toplama dönüşümü ekleyin ve bu Veri Akışı Betiği bu sayıların toplama tanımına yapıştırın.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Tüm sütunları kullanan ayrı satır

Bu kod parçacığı, veri akışınıza yeni bir Toplama dönüşümü ekler. Bu dönüşüm tüm gelen sütunları alır, yinelenenleri ortadan kaldırmak için gruplandırma için kullanılan bir karma oluşturur ve ardından her yinelemenin ilk tekrarını çıkış olarak sağlar. Sütunları açıkça adlandırmanız gerekmez; bunlar gelen veri akışınızdan otomatik olarak oluşturulur.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Tüm sütunlarda DLL'leri denetleme

Bu, tüm sütunlarınızın NULL değerlerini genel olarak denetlemek için veri akışınıza yapıştırabileceğiniz bir kod parçacığıdır. Bu teknik, tüm satırlardaki tüm sütunlara bakmak için şema kaymasından yararlanır ve koşullu bölme kullanarak satırları NUL'leri olmayan satırlardan ayırır.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Bir seçimle Otomatik Harita şema kayma

Bilinmeyen veya dinamik bir gelen sütun kümesinden var olan bir veritabanı şemasını yüklemeniz gerektiğinde, Havuz dönüşümünde sağ taraftaki sütunları eşlemeniz gerekir. Bu yalnızca mevcut bir tabloyu yüklerken gereklidir. Sütunlarınızı otomatik olarak eşleyen bir Select oluşturmak için bu kod parçacığını Havuzunuzun önüne ekleyin. Havuz eşlemenizi otomatik eşlemeye bırakın.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Sütun veri türlerini kalıcı hale

Veri akışınızdaki sütun adlarını ve veri türlerini havuz kullanarak kalıcı bir depoya depolamak için bu betiği Türetilmiş Sütun tanımına ekleyin.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Aşağı doldur

NULL değerlerini dizideki önceki NULL olmayan değerle değiştirmek istediğinizde veri kümeleriyle ilgili yaygın "Aşağı Doldur" sorununu şu şekilde uygulayabilirsiniz. Veri kümenizin tamamında "kukla" kategori değeriyle yapay bir pencere oluşturmanız gerektiğinden bu işlemin olumsuz performans etkilerine neden olabileceğini unutmayın. Ayrıca, önceki NULL olmayan değeri bulmak üzere uygun veri sırasını oluşturmak için bir değere göre sıralamanız gerekir. Aşağıdaki kod parçacığı yapay kategoriyi "kukla" olarak oluşturur ve vekil anahtara göre sıralar. Vekil anahtarı kaldırabilir ve verilere özgü sıralama anahtarınızı kullanabilirsiniz. Bu kod parçacığı, adlı bir Kaynak dönüşümü eklediğinizi varsayar source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Hareketli Ortalama

Hareketli ortalama, Windows dönüşümü kullanılarak veri akışlarında çok kolay bir şekilde uygulanabilir. Aşağıdaki örnek, Microsoft hisse senedi fiyatlarının 15 günlük hareketli ortalamasını oluşturur.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Tüm sütun değerlerinin ayrı sayısı

Bu betiği kullanarak anahtar sütunları tanımlayabilir ve akışınızdaki tüm sütunların kardinalitesini tek bir betik kod parçacığıyla görüntüleyebilirsiniz. Bu betiği veri akışınıza toplu dönüştürme olarak eklediğinizde, tüm sütunların ayrı sayılarını otomatik olarak sağlar.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Önceki veya sonraki satır değerlerini karşılaştırma

Bu örnek kod parçacığı, geçerli satır bağlamındaki sütun değerlerini geçerli satırdan önceki ve sonraki satırlardan gelen sütun değerleriyle karşılaştırmak için Pencere dönüştürmesinin nasıl kullanılabileceğini gösterir. Bu örnekte Türetilmiş Sütun, veri kümesinin tamamında bir pencere bölümünü etkinleştirmek üzere sahte bir değer oluşturmak için kullanılır. Vekil Anahtar dönüşümü, her satır için benzersiz bir anahtar değeri atamak için kullanılır. Bu deseni veri dönüşümlerinize uyguladığınızda, sıralama ölçütü olarak kullanmak istediğiniz bir sütunsanız vekil anahtarı kaldırabilir ve verilerinizi bölümleme amacıyla kullanacağınız sütunlarınız varsa türetilmiş sütunu kaldırabilirsiniz.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Verilerimde kaç sütun var?

size(array(columns()))

Veri akışlarına genel bakış makalesiyle başlayarak Veri Akışı'leri keşfetme