Memuat data secara bertambah bertahap dari beberapa tabel di SQL Server ke Azure SQL Database menggunakan PowerShell
BERLAKU UNTUK:Azure Data Factory Azure Synapse Analytics
Tip
Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!
Dalam tutorial ini, Anda akan membuat Azure Data Factory dengan alur yang memuat data delta dari beberapa tabel dalam database SQL Server ke Azure SQL Database.
Anda akan melakukan langkah-langkah berikut dalam tutorial ini:
- Siapkan penyimpanan data sumber dan tujuan.
- Membuat pabrik data.
- Membuat runtime integrasi yang dihost sendiri.
- Pasang runtime integrasi.
- Membuat layanan tertaut.
- Buat himpunan data sumber, sink, dan marka air.
- Buat, jalankan, dan pantau alur.
- Tinjau hasilnya.
- Tambahkan atau perbarui data dalam tabel sumber.
- Jalankan ulang dan pantau alur.
- Tinjau hasil akhir.
Gambaran Umum
Berikut adalah langkah-langkah penting untuk membuat solusi ini:
Pilih kolom marka air.
Pilih satu kolom untuk setiap tabel di penyimpanan data sumber, yang dapat digunakan untuk mengidentifikasi rekaman baru atau yang diperbarui untuk setiap eksekusi. Biasanya, data di kolom yang dipilih ini (misalnya, last_modify_time atau ID) terus meningkat saat baris dibuat atau diperbarui. Nilai maksimum dalam kolom ini digunakan sebagai marka air.
Siapkan penyimpanan data untuk menyimpan nilai marka air.
Dalam tutorial ini, Anda akan menyimpan nilai marka air dalam database SQL.
Buat alur dengan aktivitas berikut:
Buat aktivitas ForEach yang berulang melalui daftar nama tabel sumber yang diteruskan sebagai parameter ke alur. Untuk setiap tabel sumber, aktivitas berikut akan dipanggil untuk melakukan pemuatan delta untuk tabel tersebut.
Buat dua aktivitas pencarian. Gunakan aktivitas Pencarian pertama untuk mengambil nilai marka air terakhir. Gunakan aktivitas Pencarian kedua untuk mengambil nilai marka air baru. Nilai marka air ini diteruskan ke aktivitas Salin.
Buat aktivitas Copy yang menyalin baris dari penyimpanan data sumber dengan nilai kolom marka air yang lebih besar dari nilai marka air lama dan kurang dari nilai marka air baru. Kemudian, aktivitas ini akan menyalin data delta dari penyimpanan data sumber ke penyimpanan Azure Blob sebagai file baru.
Buat aktivitas StoredProcedure yang memperbarui nilai marka air untuk alur yang berjalan selanjutnya.
Berikut adalah diagram solusi tingkat tinggi:
Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.
Prasyarat
- SQL Server. Anda akan menggunakan database SQL Server sebagai penyimpanan data sumber dalam tutorial ini.
- Microsoft Azure SQL database. Anda akan menggunakan database di Azure SQL Database sebagai penyimpanan data sink. Jika belum memiliki database di SQL Database, lihat Membuat database di Azure SQL Database untuk mengetahui langkah-langkah membuatnya.
Membuat tabel sumber di database SQL Server Anda
Buka SQL Server Management Studio (SSMS) atau Azure Data Studio, lalu sambungkan ke nama SQL Server Anda.
Di Server Explorer (SSMS) atau di panel Koneksi (Azure Data Studio), klik kanan database dan pilih Kueri Baru.
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
customer_table
danproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime ); INSERT INTO customer_table (PersonID, Name, LastModifytime) VALUES (1, 'John','9/1/2017 12:56:00 AM'), (2, 'Mike','9/2/2017 5:23:00 AM'), (3, 'Alice','9/3/2017 2:36:00 AM'), (4, 'Andy','9/4/2017 3:21:00 AM'), (5, 'Anny','9/5/2017 8:06:00 AM'); INSERT INTO project_table (Project, Creationtime) VALUES ('project1','1/1/2015 0:00:00 AM'), ('project2','2/2/2016 1:23:00 AM'), ('project3','3/4/2017 5:16:00 AM');
Membuat tabel tujuan di Azure SQL Database Anda
Buka SQL Server Management Studio (SSMS) atau Azure Data Studio, lalu sambungkan ke nama SQL Server Anda.
Di Server Explorer (SSMS) atau di panel Koneksi (Azure Data Studio), klik kanan database dan pilih Kueri Baru.
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
customer_table
danproject_table
:create table customer_table ( PersonID int, Name varchar(255), LastModifytime datetime ); create table project_table ( Project varchar(255), Creationtime datetime );
Membuat tabel lain di Azure SQL Database Anda untuk menyimpan nilai marka air yang tinggi
Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama
watermarktable
guna menyimpan nilai marka air:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
Sisipkan nilai marka air awal untuk kedua tabel sumber ke dalam tabel marka air.
INSERT INTO watermarktable VALUES ('customer_table','1/1/2010 12:00:00 AM'), ('project_table','1/1/2010 12:00:00 AM');
Membuat prosedur tersimpan di Azure SQL Database Anda
Jalankan perintah berikut ini untuk membuat prosedur tersimpan di database Anda. Prosedur tersimpan ini memperbarui nilai marka air setelah setiap eksekusi alur berjalan.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Membuat jenis data dan prosedur tersimpan tambahan di Azure SQL Database Anda
Jalankan kueri berikut untuk membuat dua prosedur tersimpan dan dua jenis data di database Anda. Prosedur tersimpan dan jenis data ini digunakan untuk menggabungkan data dari tabel sumber ke tabel tujuan.
Untuk mempermudah proses mulai, kita langsung menggunakan Prosedur Tersimpan ini melewati data delta melalui variabel tabel, kemudian menggabungkannya ke penyimpanan tujuan. Ingat bahwa penyimpanan tujuan ini tidak mampu menyimpan baris delta dalam jumlah "besar" (lebih dari 100) dalam variabel tabel.
Jika Anda perlu menggabungkan baris delta dalam jumlah besar ke penyimpanan tujuan, kami sarankan untuk menggunakan aktivitas salin untuk menyalin semua data delta ke dalam tabel "penahapan" sementara di penyimpanan tujuan terlebih dahulu, kemudian membuat prosedur tersimpan milik Anda sendiri tanpa menggunakan variabel tabel untuk menggabungkan data delta tersebut dari tabel “penahapan” ke tabel “akhir”.
CREATE TYPE DataTypeforCustomerTable AS TABLE(
PersonID int,
Name varchar(255),
LastModifytime datetime
);
GO
CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS
BEGIN
MERGE customer_table AS target
USING @customer_table AS source
ON (target.PersonID = source.PersonID)
WHEN MATCHED THEN
UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
WHEN NOT MATCHED THEN
INSERT (PersonID, Name, LastModifytime)
VALUES (source.PersonID, source.Name, source.LastModifytime);
END
GO
CREATE TYPE DataTypeforProjectTable AS TABLE(
Project varchar(255),
Creationtime datetime
);
GO
CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS
BEGIN
MERGE project_table AS target
USING @project_table AS source
ON (target.Project = source.Project)
WHEN MATCHED THEN
UPDATE SET Creationtime = source.Creationtime
WHEN NOT MATCHED THEN
INSERT (Project, Creationtime)
VALUES (source.Project, source.Creationtime);
END
Azure PowerShell
Pasang modul Azure PowerShell terbaru dengan mengikuti instruksi dalam Cara memasang dan mengonfigurasikan Azure PowerShell.
Membuat pabrik data
Tentukan variabel untuk nama grup sumber daya yang Anda gunakan di perintah PowerShell nanti. Salin teks perintah berikut ke PowerShell, tentukan nama untuk grup sumber daya Azure dalam tanda kutip ganda, lalu jalankan perintah. Contohnya
"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
Jika grup sumber daya sudah ada, Anda mungkin tidak ingin menimpanya. Tetapkan nilai yang berbeda ke variabel
$resourceGroupName
, lalu jalankan perintah lagi.Tentukan variabel untuk lokasi pabrik data.
$location = "East US"
Untuk membuat grup sumber daya Azure, jalankan perintah berikut:
New-AzResourceGroup $resourceGroupName $location
Jika grup sumber daya sudah ada, Anda mungkin tidak ingin menimpanya. Tetapkan nilai yang berbeda ke variabel
$resourceGroupName
, lalu jalankan perintah lagi.Tentukan variabel untuk nama pabrik data.
Penting
Perbarui nama pabrik data untuk menjadikannya unik secara global. Contohnya adalah ADFIncMultiCopyTutorialFactorySP1127.
$dataFactoryName = "ADFIncMultiCopyTutorialFactory";
Untuk membuat pabrik data, jalankan cmdlet Set-AzDataFactoryV2 berikut:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
Perhatikan poin berikut:
Nama pabrik data harus unik secara global. Jika Anda menerima kesalahan berikut, ubah nama dan coba lagi:
Set-AzDataFactoryV2 : HTTP Status Code: Conflict Error Code: DataFactoryNameInUse Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
Untuk membuat instans Data Factory, akun pengguna yang Anda gunakan untuk masuk ke Azure harus merupakan anggota dari peran kontributor atau pemilik, atau admin dari langganan Azure.
Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Storage, SQL Database, Azure SQL Managed Instance, dan lain-lain) dan komputasi (Azure HDInsight, dll.) yang digunakan pabrik data dapat berada di wilayah lain.
Membuat runtime integrasi yang dihost sendiri
Di bagian ini, Anda membuat runtime integrasi yang dihost sendiri dan mengaitkannya dengan mesin lokal dengan database Microsoft SQL Server. Runtime integrasi yang dihost sendiri adalah komponen yang menyalin data dari SQL Server di komputer Anda ke Azure SQL Database.
Buat variabel untuk nama runtime integrasi. Gunakan nama yang unik, dan buat catatan nama itu. Anda menggunakannya nanti dalam tutorial ini.
$integrationRuntimeName = "ADFTutorialIR"
Membuat runtime integrasi yang dihost sendiri.
Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
Berikut adalah output sampel:
Name : <Integration Runtime name> Type : SelfHosted ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
Untuk mengambil status runtime integrasi yang dibuat, jalankan perintah berikut. Konfirmasikan bahwa nilai properti Status diatur ke NeedRegistration.
Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
Berikut adalah output sampel:
State : NeedRegistration Version : CreateTime : 9/24/2019 6:00:00 AM AutoUpdate : On ScheduledUpdateDate : UpdateDelayOffset : LocalTimeZoneOffset : InternalChannelEncryption : Capabilities : {} ServiceUrls : {eu.frontend.clouddatahub.net} Nodes : {} Links : {} Name : ADFTutorialIR Type : SelfHosted ResourceGroupName : <ResourceGroup name> DataFactoryName : <DataFactory name> Description : Id : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
Untuk mengambil kunci autentikasi guna mendaftarkan runtime integrasi yang dihost sendiri dengan layanan Azure Data Factory di cloud, jalankan perintah berikut:
Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
Berikut adalah output sampel:
{ "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=", "AuthKey2": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy=" }
Salin salah satu kunci (kecualikan tanda kutip ganda) yang digunakan untuk mendaftarkan runtime integrasi yang dihosting sendiri yang Anda instal di komputer Anda dengan langkah-langkah berikut.
Instal alat runtime integrasi
Jika Anda sudah memiliki runtime integrasi pada mesin Anda, hapus instalasinya dengan menggunakan Tambah atau Hapus Program.
Unduh runtime integrasi yang dihost sendiri di komputer Windows lokal. Jalankan penginstalan.
Di halaman Selamat Datang di Penyiapan Microsoft Integration Runtime, pilih Berikutnya.
Di halaman Perjanjian Lisensi Pengguna Akhir, terima persyaratan dan perjanjian lisensi, lalu pilih Berikutnya.
Di halaman Folder Tujuan, pilih Berikutnya.
Di halaman Siap menginstal Microsoft Integration Runtime, pilih Instal.
Di halaman Penyiapan Microsoft Integration Runtime yang Telah Selesai, pilih Selesai.
Di halaman Daftarkan Integration Runtime (Dihost sendiri), tempelkan kunci yang Anda simpan di bagian sebelumnya, lalu pilih Daftarkan.
Di jendela Simpul Integration Runtime baru (Dihost sendiri), pilih Selesai.
Jika runtime integrasi yang dihost sendiri berhasil didaftarkan, Anda melihat pesan berikut:
Di halaman Daftarkan Integration Runtime (Dihost sendiri), pilih Luncurkan Configuration Manager.
Anda akan melihat halaman berikut saat simpulnya tersambung ke layanan cloud:
Sekarang, uji konektivitas ke SQL Server database Anda.
a. Di halaman Configuration Manager, masuk ke tab Diagnostik.
b. Pilih SqlServer untuk jenis sumber data.
c. Masukkan nama server.
d. Masukkan nama database.
e. Pilih mode autentikasi.
f. Masukkan nama pengguna.
g. Masukkan kata sandi yang terkait dengan nama pengguna.
h. Untuk mengonfirmasi bahwa runtime integrasi dapat terhubung ke SQL Server, pilih Uji. Jika koneksi berhasil, Anda akan melihat tanda centang hijau. Jika koneksi tidak berhasil, Anda akan melihat pesan kesalahan. Perbaiki masalah apa pun, dan pastikan bahwa runtime integrasi dapat terhubung ke SQL Server Anda.
Catatan
Catat nilai untuk tipe autentikasi, server, database, pengguna, dan kata sandi. Anda akan menggunakannya dalam tutorial ini.
Membuat layanan tertaut
Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Di bagian ini, Anda membuat layanan tertaut ke database SQL Server dan database Anda di Azure SQL Database.
Membuat layanan tertaut SQL Server
Dalam langkah ini, Anda menautkan database SQL Server Anda ke pabrik data.
Buat file JSON bernama SqlServerLinkedService.json di folder C:\ADFTutorials\IncCopyMultiTableTutorial (buat folder lokal jika belum ada) dengan konten berikut. Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
Penting
Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
Jika Anda menggunakan autentikasi SQL, salin definisi JSON berikut:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>" }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Jika Anda menggunakan autentikasi Windows, salin definisi JSON berikut:
{ "name":"SqlServerLinkedService", "properties":{ "annotations":[ ], "type":"SqlServer", "typeProperties":{ "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>", "userName":"<username> or <domain>\\<username>", "password":{ "type":"SecureString", "value":"<password>" } }, "connectVia":{ "referenceName":"<integration runtime name>", "type":"IntegrationRuntimeReference" } } }
Penting
- Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
- Ganti <nama runtime integrasi> dengan nama runtime integrasi Anda.
- Ganti <servername>, <databasename>, <username>, dan <password> dengan nilai database SQL Server Anda sebelum Anda menyimpan file.
- Jika Anda perlu menggunakan karakter garis miring (
\
) di akun pengguna atau nama server, gunakan karakter escape (\
). Contohnyamydomain\\myuser
.
Di PowerShell, jalankan cmdlet berikut untuk beralih ke folder C:\ADFTutorials\IncCopyMultiTableTutorial.
Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
Jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureStorageLinkedService. Dalam contoh berikut, Anda meneruskan nilai untuk parameter ResourceGroupName dan DataFactoryName:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
Berikut adalah output sampel:
LinkedServiceName : SqlServerLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
Membuat layanan tertaut Azure SQL Database
Buat file JSON bernama AzureSQLDatabaseLinkedService.json di C:\ADFTutorials\IncCopyMultiTableTutorial dengan konten berikut. (Buat folder ADF jika belum ada.) Ganti <namaserver>, <nama database>, <nama pengguna>, dan <kata sandi> dengan nama SQL Server Anda, nama database, nama pengguna, dan kata sandi Anda sebelum Anda menyimpan file.
{ "name":"AzureSQLDatabaseLinkedService", "properties":{ "annotations":[ ], "type":"AzureSqlDatabase", "typeProperties":{ "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;" } } }
Di Azure PowerShell, jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
Berikut adalah output sampel:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
Membuat himpunan data
Dalam langkah ini, Anda akan membuat himpunan data untuk mewakili sumber data, tujuan data, dan tempat untuk menyimpan marka air.
Membuat himpunan data sumber
Buat file JSON bernama SourceDataset.json di folder yang sama dengan konten berikut:
{ "name":"SourceDataset", "properties":{ "linkedServiceName":{ "referenceName":"SqlServerLinkedService", "type":"LinkedServiceReference" }, "annotations":[ ], "type":"SqlServerTable", "schema":[ ] } }
Aktivitas Salin dalam alur menggunakan kueri SQL untuk memuat data daripada memuat seluruh tabel.
Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data: SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : SourceDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
Membuat himpunan data sink
Buat file JSON bernama SinkDataset.json di folder yang sama dengan konten berikut. Elemen tableName diatur oleh alur secara dinamis pada waktu proses. Aktivitas ForEach dalam alur berulang melalui daftar nama tabel dan meneruskan nama tabel ke himpunan data ini dalam setiap perulangan.
{ "name":"SinkDataset", "properties":{ "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" }, "parameters":{ "SinkTableName":{ "type":"String" } }, "annotations":[ ], "type":"AzureSqlTable", "typeProperties":{ "tableName":{ "value":"@dataset().SinkTableName", "type":"Expression" } } } }
Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : SinkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Membuat himpunan data untuk marka air
Dalam langkah ini, Anda akan membuat himpunan data untuk menyimpan nilai marka air yang tinggi.
Buat file JSON bernama WatermarkDataset.json di folder yang sama dengan konten berikut:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
Berikut adalah output sampel dari cmdlet tersebut:
DatasetName : WatermarkDataset ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
Buat alur
Alur mengambil daftar nama tabel sebagai parameter. Aktivitas ForEach berulang melalui daftar nama tabel dan melakukan operasi berikut:
Gunakan aktivitas Pencarian untuk mengambil nilai marka air lama (nilai awal atau yang digunakan dalam perulangan terakhir).
Gunakan aktivitas Pencarian untuk mengambil nilai marka air (nilai maksimum kolom marka air dalam tabel sumber).
Gunakan aktivitas Salin untuk menyalin data di antara kedua nilai marka air ini dari database sumber ke database tujuan.
Gunakan aktivitas StoredProcedure untuk memperbarui nilaimarka air lama yang akan digunakan pada langkah pertama perulangan berikutnya.
Membuat alur
Buat file JSON bernama IncrementalCopyPipeline.json di folder yang sama dengan konten berikut:
{ "name":"IncrementalCopyPipeline", "properties":{ "activities":[ { "name":"IterateSQLTables", "type":"ForEach", "dependsOn":[ ], "userProperties":[ ], "typeProperties":{ "items":{ "value":"@pipeline().parameters.tableList", "type":"Expression" }, "isSequential":false, "activities":[ { "name":"LookupOldWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"AzureSqlSource", "sqlReaderQuery":{ "value":"select * from watermarktable where TableName = '@{item().TABLE_NAME}'", "type":"Expression" } }, "dataset":{ "referenceName":"WatermarkDataset", "type":"DatasetReference" } } }, { "name":"LookupNewWaterMarkActivity", "type":"Lookup", "dependsOn":[ ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}", "type":"Expression" } }, "dataset":{ "referenceName":"SourceDataset", "type":"DatasetReference" }, "firstRowOnly":true } }, { "name":"IncrementalCopyActivity", "type":"Copy", "dependsOn":[ { "activity":"LookupOldWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] }, { "activity":"LookupNewWaterMarkActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "source":{ "type":"SqlServerSource", "sqlReaderQuery":{ "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'", "type":"Expression" } }, "sink":{ "type":"AzureSqlSink", "sqlWriterStoredProcedureName":{ "value":"@{item().StoredProcedureNameForMergeOperation}", "type":"Expression" }, "sqlWriterTableType":{ "value":"@{item().TableType}", "type":"Expression" }, "storedProcedureTableTypeParameterName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" }, "disableMetricsCollection":false }, "enableStaging":false }, "inputs":[ { "referenceName":"SourceDataset", "type":"DatasetReference" } ], "outputs":[ { "referenceName":"SinkDataset", "type":"DatasetReference", "parameters":{ "SinkTableName":{ "value":"@{item().TABLE_NAME}", "type":"Expression" } } } ] }, { "name":"StoredProceduretoWriteWatermarkActivity", "type":"SqlServerStoredProcedure", "dependsOn":[ { "activity":"IncrementalCopyActivity", "dependencyConditions":[ "Succeeded" ] } ], "policy":{ "timeout":"7.00:00:00", "retry":0, "retryIntervalInSeconds":30, "secureOutput":false, "secureInput":false }, "userProperties":[ ], "typeProperties":{ "storedProcedureName":"[dbo].[usp_write_watermark]", "storedProcedureParameters":{ "LastModifiedtime":{ "value":{ "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type":"Expression" }, "type":"DateTime" }, "TableName":{ "value":{ "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"Expression" }, "type":"String" } } }, "linkedServiceName":{ "referenceName":"AzureSQLDatabaseLinkedService", "type":"LinkedServiceReference" } } ] } } ], "parameters":{ "tableList":{ "type":"array" } }, "annotations":[ ] } }
Jalankan cmdlet Set-AzDataFactoryV2Pipeline untuk membuat alur IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
Berikut adalah output sampel:
PipelineName : IncrementalCopyPipeline ResourceGroupName : <ResourceGroupName> DataFactoryName : <DataFactoryName> Activities : {IterateSQLTables} Parameters : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
Menjalankan alur
Buat file parameter bernama Parameters.json di folder yang sama dengan konten berikut:
{ "tableList": [ { "TABLE_NAME": "customer_table", "WaterMark_Column": "LastModifytime", "TableType": "DataTypeforCustomerTable", "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table" }, { "TABLE_NAME": "project_table", "WaterMark_Column": "Creationtime", "TableType": "DataTypeforProjectTable", "StoredProcedureNameForMergeOperation": "usp_upsert_project_table" } ] }
Jalankan alur IncrementalCopyPipeline dengan menggunakan cmdlet Invoke-AzDataFactoryV2Pipeline. Ganti tempat penampung dengan grup sumber daya dan nama pabrik data Anda sendiri.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Monitor saluran pipa
Masuk ke portal Azure.
Pilih Semua layanan,cari dengan kata kunci Pabrik data, dan pilih Pabrik data.
Cari pabrik data Anda di daftar pabrik data, dan pilih untuk membuka halaman Pabrik data.
Pada halaman Pabrik data, pilih Buka di petak peta Buka Azure Data Factory Studio untuk meluncurkan Azure Data Factory di tab terpisah.
Di beranda Azure Data Factory, pilih Monitor di sisi kiri.
Anda dapat melihat semua alur berjalan dan statusnya. Perhatikan bahwa dalam contoh berikut, status eksekusi alur menjadi Berhasil. Anda dapat memeriksa parameter yang diteruskan ke alur dengan mengeklik tautan di kolom Parameter. Jika terjadi kesalahan, Anda akan melihat tautan di kolom Kesalahan.
Saat Anda memilih tautandi kolom Tindakan, Anda akan melihat semua aktivitas berjalan untuk alur.
Untuk kembali ke tampilan Ekesekusi Alur, pilihSemua Eksekusi Alur.
Meninjau hasil
Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database SQL target untuk memverifikasi bahwa data disalin dari tabel sumber ke tabel tujuan:
Kueri
select * from customer_table
Hasil
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 Alice 2017-09-03 02:36:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Kueri
select * from project_table
Hasil
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
Kueri
select * from watermarktable
Hasil
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-05 08:06:00.000
project_table 2017-03-04 05:16:00.000
Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.
Menambahkan lebih banyak data ke tabel sumber
Jalankan kueri berikut ini terhadap database SQL Server sumber untuk memperbarui baris yang sudah ada di customer_table. Sisipkan baris baru ke dalam project_table.
UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3
INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');
Menjalankan ulang alur
Sekarang, jalankan ulang alur dengan menjalankan perintah PowerShell berikut:
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
Pantau eksekusi alur dengan mengikuti instruksi di bagian Pantau alur. Saat status alur Sedang berlangsung, Anda akan melihat tautan tindakan lain di bawah Tindaka untuk membatalkan eksekusi alur.
Pilih Refresh untuk me-refresh daftar hingga alur berhasil.
Secara opsional, pilih tautanLihat Aktivitas Berjalan di bawah Tindakan untuk melihat semua aktivitas yang terkait dengan eksekusi alur ini.
Meninjau hasil akhir
Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database target untuk memverifikasi bahwa data yang telah diperbarui/baru telah disalin dari tabel sumber ke tabel tujuan.
Kueri
select * from customer_table
Hasil
===========================================
PersonID Name LastModifytime
===========================================
1 John 2017-09-01 00:56:00.000
2 Mike 2017-09-02 05:23:00.000
3 NewName 2017-09-08 00:00:00.000
4 Andy 2017-09-04 03:21:00.000
5 Anny 2017-09-05 08:06:00.000
Perhatikan nilai baru dari Nama dan LastModifytime untuk PersonID untuk nomor 3.
Kueri
select * from project_table
Hasil
===================================
Project Creationtime
===================================
project1 2015-01-01 00:00:00.000
project2 2016-02-02 01:23:00.000
project3 2017-03-04 05:16:00.000
NewProject 2017-10-01 00:00:00.000
Perhatikan bahwa entri NewProject telah ditambahkan ke project_table.
Kueri
select * from watermarktable
Hasil
======================================
TableName WatermarkValue
======================================
customer_table 2017-09-08 00:00:00.000
project_table 2017-10-01 00:00:00.000
Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.
Konten terkait
Anda melakukan langkah-langkah berikut dalam tutorial ini:
- Siapkan penyimpanan data sumber dan tujuan.
- Membuat pabrik data.
- Membuat runtime integrasi yang dihost sendiri (IR).
- Pasang runtime integrasi.
- Membuat layanan tertaut.
- Buat himpunan data sumber, sink, dan marka air.
- Buat, jalankan, dan pantau alur.
- Tinjau hasilnya.
- Tambahkan atau perbarui data dalam tabel sumber.
- Jalankan ulang dan pantau alur.
- Tinjau hasil akhir.
Lanjutkan ke tutorial berikut untuk mempelajari tentang mengubah data dengan menggunakan kluster Spark di Azure:
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk