Skrip aliran data (DFS)
BERLAKU UNTUK: Azure Data Factory Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Aliran data tersedia di Alur Azure Data Factory dan Azure Synapse. Artikel ini berlaku untuk memetakan aliran data. Jika Anda baru mengenal transformasi, silakan lihat artikel pengantar Transformasi data menggunakan aliran data pemetaan.
Skrip aliran data (DFS) adalah metadata dasar dan mirip dengan bahasa pengkodean yang digunakan untuk menjalankan transformasi yang disertakan dalam alur data pemetaan. Setiap transformasi diwakili oleh serangkaian properti yang menyediakan informasi penting untuk menjalankan pekerjaan dengan benar. Skrip dapat dilihat dan diedit dari ADF dengan mengeklik tombol "skrip" di pita atas UI browser.
Misalnya, allowSchemaDrift: true,
dalam transformasi sumber memberi tahu layanan untuk menyertakan semua kolom dari himpunan data sumber dalam aliran data meskipun kolom tersebut tidak disertakan dalam proyeksi skema.
Kasus penggunaan
DFS secara otomatis diproduksi oleh antarmuka pengguna. Anda dapat mengeklik tombol Skrip untuk melihat dan mengkustomisasi skrip. Anda juga dapat membuat skrip di luar ADF UI lalu meneruskannya ke cmdlet PowerShell. Saat men-debug aliran data yang kompleks, Anda mungkin merasa lebih mudah memindai kode skrip di belakang daripada memindai representasi grafik UI dari alur Anda.
Berikut adalah beberapa contoh kasus penggunaan:
- Secara terprogram menghasilkan banyak aliran data yang cukup mirip, yaitu aliran data "stamping-out".
- Ekspresi kompleks yang sulit dikelola di UI atau mengakibatkan masalah validasi.
- Penelusuran kesalahan dan pemahaman yang lebih baik terhadap berbagai kesalahan yang ditampilkan selama eksekusi.
Saat membuat skrip aliran data untuk digunakan dengan PowerShell atau API, Anda harus menciutkan teks yang telah diformat menjadi satu baris. Anda dapat menyimpan tab dan garis baru sebagai karakter escape. Namun. teks harus diformat agar pas di dalam properti JSON. Ada tombol di antarmuka pengguna editor skrip di bagian bawah yang akan memformat skrip sebagai satu baris untuk Anda.
Cara menambahkan transformasi
Penambahan transformasi memerlukan tiga langkah dasar: menambahkan data transformasi inti, mengalihkan aliran input, lalu mengalihkan aliran output. Proses ini paling mudah dapat dilihat dalam contoh. Katakanlah kita mulai dengan sumber sederhana untuk melakukan sink pada aliran data seperti berikut:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Jika kita memutuskan untuk menambahkan transformasi derive, pertama-tama kita harus membuat teks transformasi inti yang memiliki ekspresi sederhana untuk menambahkan kolom huruf besar baru yang disebut upperCaseTitle
:
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
Kemudian, kita menggunakan DFS yang ada dan menambahkan transformasi:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Dan sekarang aliran masuk dirutekan kembali dengan mengidentifikasi transformasi mana yang diinginkan dari transformasi baru (dalam hal ini, source1
ini) dan menyalin nama aliran ke transformasi baru:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Terakhir transformasi yang ingin dilakukan setelah transformasi baru ini akan diidentifikasi, dan mengganti aliran inputnya (dalam hal ini, sink1
) dengan nama aliran output transformasi baru kami:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Dasar-dasar DFS
DFS terdiri dari serangkaian transformasi yang terhubung, seperti sumber, sink, dan lainnya yang dapat menambahkan kolom baru, memfilter data, menggabungkan data, dan banyak lagi. Biasanya, skrip yang dimulai dengan satu atau beberapa sumber akan diikuti oleh banyak transformasi dan diakhiri dengan satu atau beberapa sink.
Semua sumber memiliki konstruksi dasar yang sama:
source(
source properties
) ~> source_name
Misalnya, sumber sederhana dengan tiga kolom (movieId, judul, genre) adalah:
source(output(
movieId as string,
title as string,
genres as string
),
allowSchemaDrift: true,
validateSchema: false) ~> source1
Semua transformasi selain sumber memiliki konstruksi dasar yang sama:
name_of_incoming_stream transformation_type(
properties
) ~> new_stream_name
Misalnya, transformasi derive sederhana yang menggunakan kolom (judul) dan menimpanya dengan versi huruf besar adalah sebagai berikut:
source1 derive(
title = upper(title)
) ~> derive1
Dan sink tanpa skema adalah:
derive1 sink(allowSchemaDrift: true,
validateSchema: false) ~> sink1
Cuplikan skrip
Cuplikan skrip adalah kode Skrip Aliran Data yang dapat dibagikan dan dapat Anda gunakan untuk berbagi di seluruh alur data. Video di bawah ini menjelaskan cara menggunakan cuplikan skrip dan menggunakan Skrip Aliran Data untuk menyalin dan menempelkan bagian skrip di belakang grafik aliran data Anda:
Statistik ringkasan agregat
Tambahkan transformasi Agregat ke alur data Anda yang disebut "SummaryStats" lalu tempelkan kode di bawah ini untuk mengaktifkan fungsi agregat di skrip Anda, menggantikan SummaryStats yang ada. Cara ini akan memberikan pola generik pada statistik ringkasan profil data.
aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats
Anda juga dapat menggunakan sampel di bawah ini untuk menghitung jumlah unik dan jumlah baris berbeda dalam data Anda. Contoh di bawah ini dapat ditempelkan ke dalam aliran data dengan transformasi Agregat yang disebut ValueDistAgg. Contoh ini menggunakan kolom yang disebut "judul". Pastikan Anda mengganti "judul" dengan kolom string di data yang ingin Anda gunakan untuk mendapatkan hitungan nilai.
aggregate(groupBy(title),
countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
numofdistinct = countDistinct(title)) ~> UniqDist
Menyertakan semua kolom secara agregat
Ini adalah pola agregat umum yang menunjukkan bagaimana Anda dapat menyimpan kolom yang tersisa dalam metadata keluaran Anda saat Anda membuat agregat. Dalam hal ini, kami menggunakan fungsi first()
untuk memilih nilai pertama di setiap kolom yang namanya bukan "film". Untuk dapat menggunakan ini, buat transformasi Agregat yang disebut DistinctRows lalu tempelkan ini di skrip Anda pada bagian atas skrip agregat DistinctRows yang ada.
aggregate(groupBy(movie),
each(match(name!='movie'), $$ = first($$))) ~> DistinctRows
Membuat sidik jari hash baris
Gunakan kode ini dalam skrip aliran data Anda untuk membuat kolom turunan baru bernamaDWhash
yang menghasilkan sha1
hash tiga kolom.
derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash
Anda juga dapat menggunakan skrip di bawah ini untuk menghasilkan hash baris menggunakan semua kolom yang ada di aliran Anda, tanpa perlu memberi nama setiap kolom:
derive(DWhash = sha1(columns())) ~> DWHash
String_agg yang setara
Kode ini akan bertindak seperti fungsi T-SQL string_agg()
dan akan menggabungkan nilai string ke dalam array. Kemudian Anda dapat mentransmisikan array tersebut ke dalam string untuk digunakan dengan tujuan SQL.
source1 aggregate(groupBy(year),
string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg
Menghitung jumlah pembaruan, upsert, insert, dan penghapusan
Saat menggunakan transformasi Ubah Baris, Anda mungkin ingin menghitung jumlah pembaruan, upsert, insert, dan penghapusan hasil tersebut dari kebijakan Ubah Baris Anda. Tambahkan transformasi Agregat setelah baris perubahan Anda dan tempelkan Skrip Aliran Data ini ke dalam definisi agregat untuk jumlah tersebut.
aggregate(updates = countIf(isUpdate(), 1),
inserts = countIf(isInsert(), 1),
upserts = countIf(isUpsert(), 1),
deletes = countIf(isDelete(),1)) ~> RowCount
Baris berbeda yang menggunakan semua kolom
Cuplikan ini akan menambahkan transformasi Agregat baru ke aliran data Anda, yang akan mengambil semua kolom yang masuk, menghasilkan hash yang digunakan untuk pengelompokan guna menghilangkan duplikat, lalu memberikan kemunculan pertama dari setiap duplikat sebagai output. Anda tidak perlu memberi nama kolom secara eksplisit, kolom tersebut akan dibuat secara otomatis dari aliran data masuk Anda.
aggregate(groupBy(mycols = sha2(256,columns())),
each(match(true()), $$ = first($$))) ~> DistinctRows
Memeriksa NULL di semua kolom
Cuplikan ini dapat ditempelkan ke aliran data Anda untuk secara umum memeriksa semua kolom nilai NULL. Teknik ini memanfaatkan skema drift untuk melihat semua kolom di semua baris dan menggunakan Pemisahan Bersyarat untuk memisahkan baris dengan NULL dari baris tanpa NULL.
split(contains(array(toString(columns())),isNull(#item)),
disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)
Drift skema peta otomatis dengan pilihan
Jika perlu memuat skema database yang ada dari set kolom masuk yang tidak diketahui atau dinamis, Anda harus memetakan kolom sisi kanan dalam transformasi Sink. Ini hanya diperlukan saat Anda memuat tabel yang ada. Tambahkan cuplikan ini sebelum Sink Anda untuk membuat Pilih yang memetakan kolom Anda secara otomatis. Biarkan pemetaan Sink Anda memetakan secara otomatis.
select(mapColumn(
each(match(true()))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> automap
Tipe data kolom persisten
Tambahkan skrip ini di dalam definisi Kolom Turunan untuk menyimpan nama kolom dan tipe data dari aliran data Anda ke penyimpanan persisten menggunakan sink.
derive(each(match(type=='string'), $$ = 'string'),
each(match(type=='integer'), $$ = 'integer'),
each(match(type=='short'), $$ = 'short'),
each(match(type=='complex'), $$ = 'complex'),
each(match(type=='array'), $$ = 'array'),
each(match(type=='float'), $$ = 'float'),
each(match(type=='date'), $$ = 'date'),
each(match(type=='timestamp'), $$ = 'timestamp'),
each(match(type=='boolean'), $$ = 'boolean'),
each(match(type=='long'), $$ = 'long'),
each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1
Isi ke bawah
Berikut ini cara menerapkan masalah umum "Isi" dengan himpunan data saat Anda ingin mengganti nilai NULL dengan nilai dari nilai non-NULL sebelumnya dalam urutan. Perhatikan bahwa operasi ini dapat memiliki implikasi kinerja negatif karena Anda harus membuat jendela sintetis di seluruh himpunan data Anda dengan nilai kategori "dummy". Selain itu, untuk membuat urutan data yang tepat agar dapat menemukan nilai non-NULL sebelumnya, Anda harus mengurutkannya berdasarkan nilai. Cuplikan di bawah ini membuat kategori sintetis sebagai "dummy" dan diurutkan berdasarkan kunci pengganti. Anda dapat menghapus kunci pengganti dan menggunakan kunci pengurutan khusus data. Cuplikan kode ini mengasumsikan bahwa Anda telah menambahkan transformasi Sumber yang bernama source1
source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
asc(sk, true),
Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1
Rata-Rata Gerak
Rata-rata bergerak dapat diimplementasikan dengan sangat mudah dalam aliran data menggunakan transformasi Windows. Contoh di bawah ini menciptakan rata-rata pergerakan harga saham 15 hari Microsoft.
window(over(stocksymbol),
asc(Date, true),
startRowOffset: -7L,
endRowOffset: 7L,
FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1
Jumlah yang berbeda dari semua nilai kolom
Anda dapat menggunakan skrip ini untuk mengidentifikasi kolom kunci dan melihat kardinalitas semua kolom dalam aliran Anda dengan cuplikan skrip tunggal. Tambahkan skrip ini sebagai transformasi agregat ke aliran data Anda dan secara otomatis akan memberikan jumlah yang berbeda dari semua kolom.
aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern
Membandingkan nilai baris sebelumnya atau berikutnya
Cuplikan sampel ini menunjukkan bagaimana transformasi Jendela dapat digunakan untuk membandingkan nilai kolom dari konteks baris saat ini dengan nilai kolom dari baris sebelum dan sesudah baris saat ini. Dalam contoh ini, Derived Column digunakan untuk menghasilkan nilai dummy untuk mengaktifkan partisi jendela di seluruh kumpulan data. Transformasi Kunci Pengganti digunakan untuk menetapkan nilai kunci unik untuk setiap baris. Saat Anda menerapkan pola ini ke transformasi data Anda, Anda bisa menghapus kunci pengganti jika Anda adalah kolom yang ingin Anda pesan dan Anda bisa menghapus kolom turunan jika Anda memiliki kolom yang digunakan untuk mempartisi data Anda.
source1 keyGenerate(output(sk as long),
startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
asc(sk, true),
prevAndCurr = lag(title,1)+'-'+last(title),
nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag
Berapa banyak kolom dalam data saya?
size(array(columns()))
Konten terkait
Menjelajahi Aliran Data menggunakan artikel gambaran umum aliran data