Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
BERLAKU UNTUK:
Azure Data Factory
Azure Synapse Analytics
Tips
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Dalam tutorial ini, Anda akan membuat Azure Data Factory dengan alur yang memuat data delta dari beberapa tabel dalam database SQL Server ke Azure SQL Database.
Anda akan melakukan langkah-langkah berikut dalam tutorial ini:
- Siapkan penyimpanan data sumber dan tujuan.
- Membuat pabrik data.
- Membuat runtime integrasi yang dihost sendiri.
- Instal runtime integrasi ini.
- Membuat layanan tertaut.
- Buat himpunan data sumber, tempat tujuan, dan penanda waktu.
- Buat, jalankan, dan awasi pipa alir.
- Tinjau hasilnya.
- Tambahkan atau perbarui data dalam tabel sumber.
- Jalankan ulang dan pantau alur.
- Tinjau hasil akhir.
Gambaran Umum
Berikut adalah langkah-langkah penting untuk membuat solusi ini:
Pilih kolom tanda air.
Pilih satu kolom untuk setiap tabel di penyimpanan data sumber, yang dapat digunakan untuk mengidentifikasi rekaman baru atau yang diperbarui untuk setiap eksekusi. Biasanya, data di kolom yang dipilih ini (misalnya, last_modify_time atau ID) terus meningkat saat baris dibuat atau diperbarui. Nilai maksimum dalam kolom ini digunakan sebagai marka air.
Siapkan penyimpanan data untuk menyimpan nilai tanda air.
Dalam tutorial ini, Anda akan menyimpan nilai marka air dalam database SQL.
Buat alur dengan aktivitas berikut:
Buat aktivitas ForEach yang berulang melalui daftar nama tabel sumber yang diteruskan sebagai parameter ke alur. Untuk setiap tabel sumber, aktivitas berikut akan dipanggil untuk melakukan pemuatan delta untuk tabel tersebut.
Buat dua aktivitas pencarian. Gunakan operasi pencarian pertama untuk mengambil nilai penanda waktu terakhir. Gunakan aktivitas Pencarian kedua untuk mengambil nilai penanda baru. Nilai tanda air ini diteruskan ke operasi penyalinan.
Buat aktivitas Salin yang menyalin baris dari sumber penyimpanan data dengan nilai kolom penanda yang lebih besar dari nilai penanda lama dan kurang dari atau sama dengan nilai penanda baru. Kemudian, aktivitas ini akan menyalin data delta dari penyimpanan data sumber ke penyimpanan Azure Blob sebagai file baru.
Buat aktivitas StoredProcedure yang memperbarui nilai penanda untuk alur kerja yang berjalan selanjutnya.
Berikut adalah diagram solusi tingkat tinggi:
Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.
Prasyarat
- SQL Server. Anda akan menggunakan database SQL Server sebagai penyimpanan data sumber dalam tutorial ini.
- Microsoft Azure SQL database. Anda akan menggunakan database di Azure SQL Database sebagai penyimpanan data sink. Jika belum memiliki database di SQL Database, lihat Membuat database di Azure SQL Database untuk mengetahui langkah-langkah membuatnya.
Membuat tabel sumber di database SQL Server Anda
Buka SQL Server Management Studio (SSMS) atau Visual Studio Code, dan sambungkan ke database SQL Server Anda.
Di Server Explorer (SSMS) atau di panel Koneksi (Visual Studio Code), klik kanan database dan pilih Kueri Baru.
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
customer_tabledanproject_table:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Membuat tabel tujuan di Azure SQL Database Anda
Buka SQL Server Management Studio (SSMS) atau Visual Studio Code, dan sambungkan ke database SQL Server Anda.
Di Server Explorer (SSMS) atau di panel Koneksi (Visual Studio Code), klik kanan database dan pilih Kueri Baru.
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
customer_tabledanproject_table:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Membuat tabel lain di Azure SQL Database untuk menyimpan nilai ambang batas tinggi.
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
watermarktableuntuk menyimpan nilai watermark.create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );Sisipkan nilai marka air awal untuk kedua tabel sumber ke dalam tabel marka air.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Buat prosedur tersimpan di Azure SQL Database
Jalankan perintah berikut ini untuk membuat prosedur tersimpan di database Anda. Prosedur tersimpan ini memperbarui nilai marka air setelah setiap menjalankan pipeline.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Membuat jenis data dan prosedur tersimpan tambahan di Azure SQL Database Anda
Jalankan kueri berikut untuk membuat dua prosedur tersimpan dan dua jenis data di database Anda. Digunakan untuk menggabungkan data dari tabel sumber ke tabel target.
Untuk mempermudah proses memulai, kita langsung menggunakan Prosedur Tersimpan ini dengan mengirimkan data delta melalui variabel tabel, kemudian menggabungkannya ke penyimpanan tujuan. Berhati-hatilah agar tidak mengharapkan penyimpanan baris delta dalam jumlah besar (lebih dari 100) dalam variabel tabel.
Jika Anda perlu menggabungkan baris delta dalam jumlah besar ke penyimpanan tujuan, kami sarankan untuk menggunakan aktivitas salin untuk menyalin semua data delta ke dalam tabel "penahapan" sementara di penyimpanan tujuan terlebih dahulu, kemudian membuat prosedur tersimpan milik Anda sendiri tanpa menggunakan variabel tabel untuk menggabungkan data delta tersebut dari tabel “penahapan” ke tabel “akhir”.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Pasang modul Azure PowerShell terbaru dengan mengikuti instruksi dalam Cara memasang dan mengonfigurasikan Azure PowerShell.
Membuat pabrik data
Tentukan variabel untuk nama grup sumber daya yang Anda gunakan di perintah PowerShell nanti. Salin teks perintah berikut ke PowerShell, tentukan nama untuk grup sumber daya Azure dalam tanda kutip ganda, lalu jalankan perintah. Contohnya
"adfrg".$resourceGroupName = "ADFTutorialResourceGroup";Jika grup sumber daya sudah ada, Anda mungkin tidak ingin menggantinya. Tetapkan nilai yang berbeda ke variabel
$resourceGroupName, lalu jalankan perintah lagi.Tentukan variabel untuk lokasi pabrik data.
$location = "East US"Untuk membuat grup sumber daya Azure, jalankan perintah berikut:
New-AzResourceGroup $resourceGroupName $locationJika grup sumber daya sudah ada, Anda mungkin tidak ingin menggantinya. Tetapkan nilai yang berbeda ke variabel
$resourceGroupName, lalu jalankan perintah lagi.Tentukan variabel untuk nama pabrik data.
Penting
Perbarui nama pabrik data untuk menjadikannya unik secara global. Contohnya adalah ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";Untuk membuat pabrik data, jalankan cmdlet Set-AzDataFactoryV2 berikut:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Perhatikan poin berikut:
Nama pabrik data harus unik secara global. Jika Anda menerima kesalahan berikut, ubah nama dan coba lagi:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.Untuk membuat instans Data Factory, akun pengguna yang Anda gunakan untuk masuk ke Azure harus merupakan anggota dari peran kontributor atau pemilik, atau admin dari langganan Azure.
Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Storage, SQL Database, Azure SQL Managed Instance, dan lain-lain) dan komputasi (Azure HDInsight, dll.) yang digunakan pabrik data dapat berada di wilayah lain.
Membuat runtime integrasi yang dihost sendiri
Di bagian ini, Anda membuat runtime integrasi yang dihost sendiri dan mengaitkannya dengan mesin lokal dengan database Microsoft SQL Server. Komponen runtime integrasi yang di-host secara mandiri adalah yang menyalin data dari SQL Server di komputer Anda ke Azure SQL Database.
Buat variabel untuk nama runtime integrasi. Gunakan nama yang unik, dan buat catatan nama itu. Anda menggunakannya nanti dalam tutorial ini.
$integrationRuntimeName = "ADFTutorialIR"Membuat runtime integrasi yang dihost sendiri.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupNameBerikut adalah output sampel:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIRUntuk mengambil status runtime integrasi yang dibuat, jalankan perintah berikut. Konfirmasikan bahwa nilai properti Status diatur ke NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -StatusBerikut adalah output sampel:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>Untuk mengambil kunci autentikasi guna mendaftarkan runtime integrasi yang dihost sendiri dengan layanan Azure Data Factory di cloud, jalankan perintah berikut:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-JsonBerikut adalah output sampel:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }Salin salah satu kunci (kecualikan tanda kutip ganda) yang digunakan untuk mendaftarkan runtime integrasi yang dihosting sendiri yang Anda instal di komputer Anda dengan langkah-langkah berikut.
Instalasi alat runtime untuk integrasi
Jika Anda sudah memiliki runtime integrasi pada mesin Anda, hapus instalasinya dengan menggunakan Tambah atau Hapus Program.
Unduh runtime integrasi mandiri di mesin Windows lokal. Jalankan penginstalan.
Di halaman Selamat Datang di Penyiapan Microsoft Integration Runtime, pilih Berikutnya.
Di halaman Perjanjian Lisensi Pengguna Akhir, terima persyaratan dan perjanjian lisensi, lalu pilih Berikutnya.
Di halaman Folder Tujuan, pilih Berikutnya.
Di halaman Siap menginstal Microsoft Integration Runtime, pilih Instal.
Di halaman Penyiapan Microsoft Integration Runtime yang Telah Selesai, pilih Selesai.
Di halaman Daftarkan Integration Runtime (Dihost sendiri), tempelkan kunci yang Anda simpan di bagian sebelumnya, lalu pilih Daftarkan.
Di jendela Simpul Integration Runtime baru (Dihost sendiri), pilih Selesai.
Jika runtime integrasi yang dihost sendiri berhasil didaftarkan, Anda melihat pesan berikut:
Di halaman Daftarkan Integration Runtime (Dihost sendiri), pilih Luncurkan Configuration Manager.
Anda akan melihat halaman berikut saat simpulnya tersambung ke layanan cloud:
Sekarang, uji konektivitas ke SQL Server database Anda.
a. Di halaman Configuration Manager, masuk ke tab Diagnostik.
b. Pilih SqlServer untuk jenis sumber data.
c. Masukkan nama server.
d. Masukkan nama database.
e. Pilih mode autentikasi.
f. Masukkan nama pengguna.
g. Masukkan kata sandi yang terkait dengan nama pengguna.
h. Untuk mengonfirmasi bahwa runtime integrasi dapat terhubung ke SQL Server, pilih Uji. Jika koneksi berhasil, Anda akan melihat tanda centang hijau. Jika koneksi tidak berhasil, Anda akan melihat pesan kesalahan. Perbaiki masalah apa pun, dan pastikan bahwa runtime integrasi dapat terhubung ke SQL Server.
Catatan
Catat nilai untuk tipe autentikasi, server, database, pengguna, dan kata sandi. Anda menggunakannya nanti dalam tutorial ini.
Membuat layanan yang terhubung
Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Di bagian ini, Anda membuat layanan tertaut ke database SQL Server dan database Anda di Azure SQL Database.
Membuat layanan tertaut SQL Server
Dalam langkah ini, Anda menautkan database SQL Server Anda ke pabrik data.
Buat file JSON bernama SqlServerLinkedService.json di folder C:\ADFTutorials\IncCopyMultiTableTutorial (buat folder lokal jika belum ada) dengan konten berikut. Pilih bagian yang sesuai berdasarkan autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
Penting
Pilih bagian yang sesuai berdasarkan autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
Jika Anda menggunakan autentikasi SQL, salin definisi JSON berikut:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Jika Anda menggunakan autentikasi Windows, salin definisi JSON berikut:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }Penting
- Pilih bagian yang sesuai berdasarkan autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
- Ganti <nama runtime integrasi> dengan nama runtime integrasi milik Anda.
- Ganti <servername>, <databasename>, <username>, dan <password> dengan nilai database SQL Server Anda sebelum Anda menyimpan file.
- Jika Anda perlu menggunakan karakter garis miring (
\) di akun pengguna atau nama server, gunakan karakter escape (\). Contohnyamydomain\\myuser.
Di PowerShell, jalankan cmdlet berikut untuk beralih ke folder C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'Jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureStorageLinkedService. Dalam contoh berikut, Anda meneruskan nilai untuk parameter ResourceGroupName dan DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"Berikut adalah output sampel:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Membuat layanan tertaut Azure SQL Database
Buat file JSON bernama AzureSQLDatabaseLinkedService.json di C:\ADFTutorials\IncCopyMultiTableTutorial dengan konten berikut. (Buat folder ADF jika belum ada.) Ganti <namaserver>, <nama database>, <nama pengguna>, dan <kata sandi> dengan nama SQL Server Anda, nama database, nama pengguna, dan kata sandi Anda sebelum Anda menyimpan file.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }Di Azure PowerShell, jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"Berikut adalah output sampel:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Membuat himpunan data
Dalam langkah ini, Anda akan membuat himpunan data untuk mewakili sumber data, tujuan data, dan tempat untuk menyimpan penanda air.
Membuat himpunan data sumber
Buat file JSON bernama SourceDataset.json di folder yang sama dengan konten berikut:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }Aktivitas Salin dalam alur menggunakan kueri SQL untuk memuat data daripada memuat seluruh tabel.
Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data: SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Membuat kumpulan data sink
Buat file JSON bernama SinkDataset.json di folder yang sama dengan konten berikut. Elemen tableName diatur oleh alur secara dinamis pada waktu proses. Aktivitas ForEach dalam pipa data mengiterasi melalui daftar nama tabel dan meneruskan nama tabel ke set data ini dalam setiap perulangan.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Membuat kumpulan data untuk tanda air
Dalam langkah ini, Anda akan membuat set data untuk menyimpan nilai penanda level tinggi.
Buat file JSON bernama WatermarkDataset.json di folder yang sama dengan konten berikut:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Buat alur
Alur mengambil daftar nama tabel sebagai parameter. Aktivitas ForEach berulang melalui daftar nama tabel dan melakukan operasi berikut:
Gunakan aktivitas Lookup untuk mengambil nilai marka air lama (nilai awal atau yang digunakan pada iterasi terakhir).
Gunakan aktivitas Lookup untuk mengambil nilai watermark baru (nilai maksimum kolom watermark dalam tabel sumber).
Gunakan aktivitas Salin untuk menyalin data di antara kedua nilai penanda ini dari database sumber ke database tujuan.
Gunakan aktivitas StoredProcedure untuk memperbarui nilai tanda air lama. Nilai ini akan digunakan pada langkah pertama perulangan berikutnya.
Membuat alur
Buat file JSON bernama IncrementalCopyPipeline.json di folder yang sama dengan konten berikut:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }Jalankan cmdlet Set-AzDataFactoryV2Pipeline untuk membuat alur IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"Berikut adalah output sampel:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Menjalankan alur
Buat file parameter bernama Parameters.json di folder yang sama dengan konten berikut:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }Jalankan alur IncrementalCopyPipeline dengan menggunakan cmdlet Invoke-AzDataFactoryV2Pipeline. Ganti placeholder dengan nama grup sumber daya dan pabrik data Anda sendiri.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitor saluran pipa
Masuk ke portal Azure.
Pilih Semua layanan,cari dengan kata kunci Pabrik data, dan pilih Pabrik data.
Cari pabrik data Anda di daftar pabrik data, dan pilih untuk membuka halaman Pabrik data.
Pada halaman Pabrik data, pilih Buka di petak peta Buka Azure Data Factory Studio untuk meluncurkan Azure Data Factory di tab terpisah.
Di beranda Azure Data Factory, pilih Monitor di sisi kiri.
Anda dapat melihat semua alur berjalan dan statusnya. Perhatikan bahwa dalam contoh berikut, status pengoperasian pipeline adalah Berhasil. Anda dapat memeriksa parameter yang diteruskan ke alur dengan mengeklik tautan di kolom Parameter. Jika terjadi kesalahan, Anda akan melihat tautan di kolom Kesalahan.
Saat Anda memilih tautan di kolom Tindakan, Anda akan melihat semua aktivitas untuk pipeline.
Untuk kembali ke tampilan Eksekusi Alur, pilih Semua Eksekusi Alur.
Meninjau hasil
Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database SQL target untuk memverifikasi bahwa data disalin dari tabel sumber ke tabel tujuan:
Kueri
select * from customer_table
Hasil
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Kueri
select * from project_table
Hasil
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Kueri
select * from watermarktable
Hasil
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.
Menambahkan lebih banyak data ke tabel sumber
Jalankan kueri berikut ini terhadap database SQL Server sumber untuk memperbarui baris yang sudah ada di customer_table. Sisipkan baris baru ke dalam project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
Menjalankan ulang alur
Sekarang, jalankan ulang alur dengan menjalankan perintah PowerShell berikut:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"Pantau eksekusi alur dengan mengikuti instruksi di bagian Pantau alur. Saat status alur Sedang berlangsung, Anda akan melihat tautan tindakan lain di bawah Tindaka untuk membatalkan eksekusi alur.
Pilih Refresh untuk memperbarui daftar hingga alur berhasil.
Secara opsional, pilih tautanLihat Aktivitas Berjalan di bawah Tindakan untuk melihat semua aktivitas yang terkait dengan eksekusi alur ini.
Meninjau hasil akhir
Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database target untuk memverifikasi bahwa data yang telah diperbarui/baru telah disalin dari tabel sumber ke tabel tujuan.
Kueri
select * from customer_table
Hasil
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Perhatikan nilai baru dari Nama dan LastModifytime untuk PersonID untuk nomor 3.
Kueri
select * from project_table
Hasil
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Perhatikan bahwa entri NewProject telah ditambahkan ke project_table.
Kueri
select * from watermarktable
Hasil
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.
Konten terkait
Anda melakukan langkah-langkah berikut dalam tutorial ini:
- Siapkan penyimpanan data sumber dan tujuan.
- Membuat pabrik data.
- Membuat runtime integrasi yang dihost sendiri (IR).
- Instal runtime integrasi ini.
- Membuat layanan tertaut.
- Buat himpunan data sumber, tempat tujuan, dan penanda waktu.
- Buat, jalankan, dan awasi pipa alir.
- Tinjau hasilnya.
- Tambahkan atau perbarui data dalam tabel sumber.
- Jalankan ulang dan pantau alur.
- Tinjau hasil akhir.
Lanjutkan ke tutorial berikut untuk mempelajari tentang mengubah data dengan menggunakan kluster Spark di Azure: