Skrip aliran data (DFS)

BERLAKU UNTUK:Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Aliran data tersedia di Alur Azure Data Factory dan Azure Synapse. Artikel ini berlaku untuk memetakan aliran data. Jika Anda baru mengenal transformasi, silakan lihat artikel pengantar Transformasi data menggunakan aliran data pemetaan.

Skrip aliran data (DFS) adalah metadata dasar dan mirip dengan bahasa pengkodean yang digunakan untuk menjalankan transformasi yang disertakan dalam alur data pemetaan. Setiap transformasi diwakili oleh serangkaian properti yang menyediakan informasi penting untuk menjalankan pekerjaan dengan benar. Skrip dapat dilihat dan diedit dari ADF dengan mengeklik tombol "skrip" di pita atas UI browser.

Script button

Misalnya, allowSchemaDrift: true, dalam transformasi sumber memberi tahu layanan untuk menyertakan semua kolom dari himpunan data sumber dalam aliran data meskipun kolom tersebut tidak disertakan dalam proyeksi skema.

Kasus penggunaan

DFS secara otomatis diproduksi oleh antarmuka pengguna. Anda dapat mengeklik tombol Skrip untuk melihat dan mengkustomisasi skrip. Anda juga dapat membuat skrip di luar ADF UI lalu meneruskannya ke cmdlet PowerShell. Saat men-debug aliran data yang kompleks, Anda mungkin merasa lebih mudah memindai kode skrip di belakang daripada memindai representasi grafik UI dari alur Anda.

Berikut adalah beberapa contoh kasus penggunaan:

  • Secara terprogram menghasilkan banyak aliran data yang cukup mirip, yaitu aliran data "stamping-out".
  • Ekspresi kompleks yang sulit dikelola di UI atau mengakibatkan masalah validasi.
  • Penelusuran kesalahan dan pemahaman yang lebih baik terhadap berbagai kesalahan yang ditampilkan selama eksekusi.

Saat membuat skrip aliran data untuk digunakan dengan PowerShell atau API, Anda harus menciutkan teks yang telah diformat menjadi satu baris. Anda dapat menyimpan tab dan garis baru sebagai karakter escape. Namun. teks harus diformat agar pas di dalam properti JSON. Ada tombol di antarmuka pengguna editor skrip di bagian bawah yang akan memformat skrip sebagai satu baris untuk Anda.

Copy button

Cara menambahkan transformasi

Penambahan transformasi memerlukan tiga langkah dasar: menambahkan data transformasi inti, mengalihkan aliran input, lalu mengalihkan aliran output. Proses ini paling mudah dapat dilihat dalam contoh. Katakanlah kita mulai dengan sumber sederhana untuk melakukan sink pada aliran data seperti berikut:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Jika kita memutuskan untuk menambahkan transformasi derive, pertama-tama kita harus membuat teks transformasi inti yang memiliki ekspresi sederhana untuk menambahkan kolom huruf besar baru yang disebut upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Kemudian, kita menggunakan DFS yang ada dan menambahkan transformasi:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Dan sekarang aliran masuk dirutekan kembali dengan mengidentifikasi transformasi mana yang diinginkan dari transformasi baru (dalam hal ini, source1 ini) dan menyalin nama aliran ke transformasi baru:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Terakhir transformasi yang ingin dilakukan setelah transformasi baru ini akan diidentifikasi, dan mengganti aliran inputnya (dalam hal ini, sink1) dengan nama aliran output transformasi baru kami:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Dasar-dasar DFS

DFS terdiri dari serangkaian transformasi yang terhubung, seperti sumber, sink, dan lainnya yang dapat menambahkan kolom baru, memfilter data, menggabungkan data, dan banyak lagi. Biasanya, skrip yang dimulai dengan satu atau beberapa sumber akan diikuti oleh banyak transformasi dan diakhiri dengan satu atau beberapa sink.

Semua sumber memiliki konstruksi dasar yang sama:

source(
  source properties
) ~> source_name

Misalnya, sumber sederhana dengan tiga kolom (movieId, judul, genre) adalah:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Semua transformasi selain sumber memiliki konstruksi dasar yang sama:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Misalnya, transformasi derive sederhana yang menggunakan kolom (judul) dan menimpanya dengan versi huruf besar adalah sebagai berikut:

source1 derive(
  title = upper(title)
) ~> derive1

Dan sink tanpa skema adalah:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Cuplikan skrip

Cuplikan skrip adalah kode Skrip Aliran Data yang dapat dibagikan dan dapat Anda gunakan untuk berbagi di seluruh alur data. Video di bawah ini menjelaskan cara menggunakan cuplikan skrip dan menggunakan Skrip Aliran Data untuk menyalin dan menempelkan bagian skrip di belakang grafik aliran data Anda:

Statistik ringkasan agregat

Tambahkan transformasi Agregat ke alur data Anda yang disebut "SummaryStats" lalu tempelkan kode di bawah ini untuk mengaktifkan fungsi agregat di skrip Anda, menggantikan SummaryStats yang ada. Cara ini akan memberikan pola generik pada statistik ringkasan profil data.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Anda juga dapat menggunakan sampel di bawah ini untuk menghitung jumlah unik dan jumlah baris berbeda dalam data Anda. Contoh di bawah ini dapat ditempelkan ke dalam aliran data dengan transformasi Agregat yang disebut ValueDistAgg. Contoh ini menggunakan kolom yang disebut "judul". Pastikan Anda mengganti "judul" dengan kolom string di data yang ingin Anda gunakan untuk mendapatkan hitungan nilai.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Menyertakan semua kolom secara agregat

Ini adalah pola agregat umum yang menunjukkan bagaimana Anda dapat menyimpan kolom yang tersisa dalam metadata keluaran Anda saat Anda membuat agregat. Dalam hal ini, kami menggunakan fungsi first() untuk memilih nilai pertama di setiap kolom yang namanya bukan "film". Untuk dapat menggunakan ini, buat transformasi Agregat yang disebut DistinctRows lalu tempelkan ini di skrip Anda pada bagian atas skrip agregat DistinctRows yang ada.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Membuat sidik jari hash baris

Gunakan kode ini dalam skrip aliran data Anda untuk membuat kolom turunan baru bernamaDWhash yang menghasilkan sha1 hash tiga kolom.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Anda juga dapat menggunakan skrip di bawah ini untuk menghasilkan hash baris menggunakan semua kolom yang ada di aliran Anda, tanpa perlu memberi nama setiap kolom:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg yang setara

Kode ini akan bertindak seperti fungsi T-SQL string_agg() dan akan menggabungkan nilai string ke dalam array. Kemudian Anda dapat mentransmisikan array tersebut ke dalam string untuk digunakan dengan tujuan SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Menghitung jumlah pembaruan, upsert, insert, dan penghapusan

Saat menggunakan transformasi Ubah Baris, Anda mungkin ingin menghitung jumlah pembaruan, upsert, insert, dan penghapusan hasil tersebut dari kebijakan Ubah Baris Anda. Tambahkan transformasi Agregat setelah baris perubahan Anda dan tempelkan Skrip Aliran Data ini ke dalam definisi agregat untuk jumlah tersebut.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Baris berbeda yang menggunakan semua kolom

Cuplikan ini akan menambahkan transformasi Agregat baru ke aliran data Anda, yang akan mengambil semua kolom yang masuk, menghasilkan hash yang digunakan untuk pengelompokan guna menghilangkan duplikat, lalu memberikan kemunculan pertama dari setiap duplikat sebagai output. Anda tidak perlu memberi nama kolom secara eksplisit, kolom tersebut akan dibuat secara otomatis dari aliran data masuk Anda.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Memeriksa NULL di semua kolom

Cuplikan ini dapat ditempelkan ke aliran data Anda untuk secara umum memeriksa semua kolom nilai NULL. Teknik ini memanfaatkan skema drift untuk melihat semua kolom di semua baris dan menggunakan Pemisahan Bersyarat untuk memisahkan baris dengan NULL dari baris tanpa NULL.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Drift skema peta otomatis dengan pilihan

Jika perlu memuat skema database yang ada dari set kolom masuk yang tidak diketahui atau dinamis, Anda harus memetakan kolom sisi kanan dalam transformasi Sink. Ini hanya diperlukan saat Anda memuat tabel yang ada. Tambahkan cuplikan ini sebelum Sink Anda untuk membuat Pilih yang memetakan kolom Anda secara otomatis. Biarkan pemetaan Sink Anda memetakan secara otomatis.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Tipe data kolom persisten

Tambahkan skrip ini di dalam definisi Kolom Turunan untuk menyimpan nama kolom dan tipe data dari aliran data Anda ke penyimpanan persisten menggunakan sink.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Isi ke bawah

Berikut ini cara menerapkan masalah umum "Isi" dengan himpunan data saat Anda ingin mengganti nilai NULL dengan nilai dari nilai non-NULL sebelumnya dalam urutan. Perhatikan bahwa operasi ini dapat memiliki implikasi kinerja negatif karena Anda harus membuat jendela sintetis di seluruh himpunan data Anda dengan nilai kategori "dummy". Selain itu, untuk membuat urutan data yang tepat agar dapat menemukan nilai non-NULL sebelumnya, Anda harus mengurutkannya berdasarkan nilai. Cuplikan di bawah ini membuat kategori sintetis sebagai "dummy" dan diurutkan berdasarkan kunci pengganti. Anda dapat menghapus kunci pengganti dan menggunakan kunci pengurutan khusus data. Cuplikan kode ini mengasumsikan bahwa Anda telah menambahkan transformasi Sumber yang bernama source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Rata-Rata Gerak

Rata-rata bergerak dapat diimplementasikan dengan sangat mudah dalam aliran data menggunakan transformasi Windows. Contoh di bawah ini menciptakan rata-rata pergerakan harga saham 15 hari Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Jumlah yang berbeda dari semua nilai kolom

Anda dapat menggunakan skrip ini untuk mengidentifikasi kolom kunci dan melihat kardinalitas semua kolom dalam aliran Anda dengan cuplikan skrip tunggal. Tambahkan skrip ini sebagai transformasi agregat ke aliran data Anda dan secara otomatis akan memberikan jumlah yang berbeda dari semua kolom.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Membandingkan nilai baris sebelumnya atau berikutnya

Cuplikan sampel ini menunjukkan bagaimana transformasi Jendela dapat digunakan untuk membandingkan nilai kolom dari konteks baris saat ini dengan nilai kolom dari baris sebelum dan sesudah baris saat ini. Dalam contoh ini, Derived Column digunakan untuk menghasilkan nilai dummy untuk mengaktifkan partisi jendela di seluruh kumpulan data. Transformasi Kunci Pengganti digunakan untuk menetapkan nilai kunci unik untuk setiap baris. Saat Anda menerapkan pola ini ke transformasi data Anda, Anda bisa menghapus kunci pengganti jika Anda adalah kolom yang ingin Anda pesan dan Anda bisa menghapus kolom turunan jika Anda memiliki kolom yang digunakan untuk mempartisi data Anda.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Berapa banyak kolom dalam data saya?

size(array(columns()))

Menjelajahi Aliran Data menggunakan artikel gambaran umum aliran data