Memuat data secara bertambah bertahap dari beberapa tabel di SQL Server ke Azure SQL Database menggunakan PowerShell

BERLAKU UNTUK:Azure Data Factory Azure Synapse Analytics

Tip

Cobalah Data Factory di Microsoft Fabric, solusi analitik all-in-one untuk perusahaan. Microsoft Fabric mencakup semuanya mulai dari pergerakan data hingga ilmu data, analitik real time, kecerdasan bisnis, dan pelaporan. Pelajari cara memulai uji coba baru secara gratis!

Dalam tutorial ini, Anda akan membuat Azure Data Factory dengan alur yang memuat data delta dari beberapa tabel dalam database SQL Server ke Azure SQL Database.

Anda akan melakukan langkah-langkah berikut dalam tutorial ini:

  • Siapkan penyimpanan data sumber dan tujuan.
  • Membuat pabrik data.
  • Membuat runtime integrasi yang dihost sendiri.
  • Pasang runtime integrasi.
  • Membuat layanan tertaut.
  • Buat himpunan data sumber, sink, dan marka air.
  • Buat, jalankan, dan pantau alur.
  • Tinjau hasilnya.
  • Tambahkan atau perbarui data dalam tabel sumber.
  • Jalankan ulang dan pantau alur.
  • Tinjau hasil akhir.

Gambaran Umum

Berikut adalah langkah-langkah penting untuk membuat solusi ini:

  1. Pilih kolom marka air.

    Pilih satu kolom untuk setiap tabel di penyimpanan data sumber, yang dapat digunakan untuk mengidentifikasi rekaman baru atau yang diperbarui untuk setiap eksekusi. Biasanya, data di kolom yang dipilih ini (misalnya, last_modify_time atau ID) terus meningkat saat baris dibuat atau diperbarui. Nilai maksimum dalam kolom ini digunakan sebagai marka air.

  2. Siapkan penyimpanan data untuk menyimpan nilai marka air.

    Dalam tutorial ini, Anda akan menyimpan nilai marka air dalam database SQL.

  3. Buat alur dengan aktivitas berikut:

    1. Buat aktivitas ForEach yang berulang melalui daftar nama tabel sumber yang diteruskan sebagai parameter ke alur. Untuk setiap tabel sumber, aktivitas berikut akan dipanggil untuk melakukan pemuatan delta untuk tabel tersebut.

    2. Buat dua aktivitas pencarian. Gunakan aktivitas Pencarian pertama untuk mengambil nilai marka air terakhir. Gunakan aktivitas Pencarian kedua untuk mengambil nilai marka air baru. Nilai marka air ini diteruskan ke aktivitas Salin.

    3. Buat aktivitas Copy yang menyalin baris dari penyimpanan data sumber dengan nilai kolom marka air yang lebih besar dari nilai marka air lama dan kurang dari nilai marka air baru. Kemudian, aktivitas ini akan menyalin data delta dari penyimpanan data sumber ke penyimpanan Azure Blob sebagai file baru.

    4. Buat aktivitas StoredProcedure yang memperbarui nilai marka air untuk alur yang berjalan selanjutnya.

    Berikut adalah diagram solusi tingkat tinggi:

    Incrementally load data

Jika Anda tidak memiliki langganan Azure, buat akun gratis sebelum Anda memulai.

Prasyarat

  • SQL Server. Anda akan menggunakan database SQL Server sebagai penyimpanan data sumber dalam tutorial ini.
  • Microsoft Azure SQL database. Anda akan menggunakan database di Azure SQL Database sebagai penyimpanan data sink. Jika belum memiliki database di SQL Database, lihat Membuat database di Azure SQL Database untuk mengetahui langkah-langkah membuatnya.

Membuat tabel sumber di database SQL Server Anda

  1. Buka SQL Server Management Studio (SSMS) atau Azure Data Studio, lalu sambungkan ke nama SQL Server Anda.

  2. Di Server Explorer (SSMS) atau di panel Koneksi (Azure Data Studio), klik kanan database dan pilih Kueri Baru.

  3. Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama customer_table dan project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Membuat tabel tujuan di Azure SQL Database Anda

  1. Buka SQL Server Management Studio (SSMS) atau Azure Data Studio, lalu sambungkan ke nama SQL Server Anda.

  2. Di Server Explorer (SSMS) atau di panel Koneksi (Azure Data Studio), klik kanan database dan pilih Kueri Baru.

  3. Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama customer_table dan project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Membuat tabel lain di Azure SQL Database Anda untuk menyimpan nilai marka air yang tinggi

  1. Jalankan perintah SQL berikut ini terhadap database Anda untuk membuat tabel bernama watermarktable guna menyimpan nilai marka air:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Sisipkan nilai marka air awal untuk kedua tabel sumber ke dalam tabel marka air.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Membuat prosedur tersimpan di Azure SQL Database Anda

Jalankan perintah berikut ini untuk membuat prosedur tersimpan di database Anda. Prosedur tersimpan ini memperbarui nilai marka air setelah setiap eksekusi alur berjalan.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Membuat jenis data dan prosedur tersimpan tambahan di Azure SQL Database Anda

Jalankan kueri berikut untuk membuat dua prosedur tersimpan dan dua jenis data di database Anda. Prosedur tersimpan dan jenis data ini digunakan untuk menggabungkan data dari tabel sumber ke tabel tujuan.

Untuk mempermudah proses mulai, kita langsung menggunakan Prosedur Tersimpan ini melewati data delta melalui variabel tabel, kemudian menggabungkannya ke penyimpanan tujuan. Ingat bahwa penyimpanan tujuan ini tidak mampu menyimpan baris delta dalam jumlah "besar" (lebih dari 100) dalam variabel tabel.

Jika Anda perlu menggabungkan baris delta dalam jumlah besar ke penyimpanan tujuan, kami sarankan untuk menggunakan aktivitas salin untuk menyalin semua data delta ke dalam tabel "penahapan" sementara di penyimpanan tujuan terlebih dahulu, kemudian membuat prosedur tersimpan milik Anda sendiri tanpa menggunakan variabel tabel untuk menggabungkan data delta tersebut dari tabel “penahapan” ke tabel “akhir”.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Pasang modul Azure PowerShell terbaru dengan mengikuti instruksi dalam Cara memasang dan mengonfigurasikan Azure PowerShell.

Membuat pabrik data

  1. Tentukan variabel untuk nama grup sumber daya yang Anda gunakan di perintah PowerShell nanti. Salin teks perintah berikut ke PowerShell, tentukan nama untuk grup sumber daya Azure dalam tanda kutip ganda, lalu jalankan perintah. Contohnya "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Jika grup sumber daya sudah ada, Anda mungkin tidak ingin menimpanya. Tetapkan nilai yang berbeda ke variabel $resourceGroupName, lalu jalankan perintah lagi.

  2. Tentukan variabel untuk lokasi pabrik data.

    $location = "East US"
    
  3. Untuk membuat grup sumber daya Azure, jalankan perintah berikut:

    New-AzResourceGroup $resourceGroupName $location
    

    Jika grup sumber daya sudah ada, Anda mungkin tidak ingin menimpanya. Tetapkan nilai yang berbeda ke variabel $resourceGroupName, lalu jalankan perintah lagi.

  4. Tentukan variabel untuk nama pabrik data.

    Penting

    Perbarui nama pabrik data untuk menjadikannya unik secara global. Contohnya adalah ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Untuk membuat pabrik data, jalankan cmdlet Set-AzDataFactoryV2 berikut:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Perhatikan poin berikut:

  • Nama pabrik data harus unik secara global. Jika Anda menerima kesalahan berikut, ubah nama dan coba lagi:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Untuk membuat instans Data Factory, akun pengguna yang Anda gunakan untuk masuk ke Azure harus merupakan anggota dari peran kontributor atau pemilik, atau admin dari langganan Azure.

  • Untuk daftar wilayah Azure tempat Data Factory saat ini tersedia, pilih wilayah yang menarik minat Anda pada halaman berikut, lalu perluas Analitik untuk menemukan Data Factory: Produk yang tersedia menurut wilayah. Penyimpanan data (Storage, SQL Database, Azure SQL Managed Instance, dan lain-lain) dan komputasi (Azure HDInsight, dll.) yang digunakan pabrik data dapat berada di wilayah lain.

Membuat runtime integrasi yang dihost sendiri

Di bagian ini, Anda membuat runtime integrasi yang dihost sendiri dan mengaitkannya dengan mesin lokal dengan database Microsoft SQL Server. Runtime integrasi yang dihost sendiri adalah komponen yang menyalin data dari SQL Server di komputer Anda ke Azure SQL Database.

  1. Buat variabel untuk nama runtime integrasi. Gunakan nama yang unik, dan buat catatan nama itu. Anda menggunakannya nanti dalam tutorial ini.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Membuat runtime integrasi yang dihost sendiri.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Berikut adalah output sampel:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Untuk mengambil status runtime integrasi yang dibuat, jalankan perintah berikut. Konfirmasikan bahwa nilai properti Status diatur ke NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Berikut adalah output sampel:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Untuk mengambil kunci autentikasi guna mendaftarkan runtime integrasi yang dihost sendiri dengan layanan Azure Data Factory di cloud, jalankan perintah berikut:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Berikut adalah output sampel:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Salin salah satu kunci (kecualikan tanda kutip ganda) yang digunakan untuk mendaftarkan runtime integrasi yang dihosting sendiri yang Anda instal di komputer Anda dengan langkah-langkah berikut.

Instal alat runtime integrasi

  1. Jika Anda sudah memiliki runtime integrasi pada mesin Anda, hapus instalasinya dengan menggunakan Tambah atau Hapus Program.

  2. Unduh runtime integrasi yang dihost sendiri di komputer Windows lokal. Jalankan penginstalan.

  3. Di halaman Selamat Datang di Penyiapan Microsoft Integration Runtime, pilih Berikutnya.

  4. Di halaman Perjanjian Lisensi Pengguna Akhir, terima persyaratan dan perjanjian lisensi, lalu pilih Berikutnya.

  5. Di halaman Folder Tujuan, pilih Berikutnya.

  6. Di halaman Siap menginstal Microsoft Integration Runtime, pilih Instal.

  7. Di halaman Penyiapan Microsoft Integration Runtime yang Telah Selesai, pilih Selesai.

  8. Di halaman Daftarkan Integration Runtime (Dihost sendiri), tempelkan kunci yang Anda simpan di bagian sebelumnya, lalu pilih Daftarkan.

    Register the integration runtime

  9. Di jendela Simpul Integration Runtime baru (Dihost sendiri), pilih Selesai.

  10. Jika runtime integrasi yang dihost sendiri berhasil didaftarkan, Anda melihat pesan berikut:

    Registered successfully

  11. Di halaman Daftarkan Integration Runtime (Dihost sendiri), pilih Luncurkan Configuration Manager.

  12. Anda akan melihat halaman berikut saat simpulnya tersambung ke layanan cloud:

    Node is connected page

  13. Sekarang, uji konektivitas ke SQL Server database Anda.

    Diagnostics tab

    a. Di halaman Configuration Manager, masuk ke tab Diagnostik.

    b. Pilih SqlServer untuk jenis sumber data.

    c. Masukkan nama server.

    d. Masukkan nama database.

    e. Pilih mode autentikasi.

    f. Masukkan nama pengguna.

    g. Masukkan kata sandi yang terkait dengan nama pengguna.

    h. Untuk mengonfirmasi bahwa runtime integrasi dapat terhubung ke SQL Server, pilih Uji. Jika koneksi berhasil, Anda akan melihat tanda centang hijau. Jika koneksi tidak berhasil, Anda akan melihat pesan kesalahan. Perbaiki masalah apa pun, dan pastikan bahwa runtime integrasi dapat terhubung ke SQL Server Anda.

    Catatan

    Catat nilai untuk tipe autentikasi, server, database, pengguna, dan kata sandi. Anda akan menggunakannya dalam tutorial ini.

Membuat layanan tertaut

Anda membuat layanan tertaut di pabrik data untuk menautkan penyimpanan data dan layanan komputasi ke pabrik data. Di bagian ini, Anda membuat layanan tertaut ke database SQL Server dan database Anda di Azure SQL Database.

Membuat layanan tertaut SQL Server

Dalam langkah ini, Anda menautkan database SQL Server Anda ke pabrik data.

  1. Buat file JSON bernama SqlServerLinkedService.json di folder C:\ADFTutorials\IncCopyMultiTableTutorial (buat folder lokal jika belum ada) dengan konten berikut. Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.

    Penting

    Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.

    Jika Anda menggunakan autentikasi SQL, salin definisi JSON berikut:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Jika Anda menggunakan autentikasi Windows, salin definisi JSON berikut:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Penting

    • Pilih bagian yang didasarkan pada autentikasi yang Anda gunakan untuk menyambungkan ke SQL Server.
    • Ganti <nama runtime integrasi> dengan nama runtime integrasi Anda.
    • Ganti <servername>, <databasename>, <username>, dan <password> dengan nilai database SQL Server Anda sebelum Anda menyimpan file.
    • Jika Anda perlu menggunakan karakter garis miring (\) di akun pengguna atau nama server, gunakan karakter escape (\). Contohnya mydomain\\myuser.
  2. Di PowerShell, jalankan cmdlet berikut untuk beralih ke folder C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureStorageLinkedService. Dalam contoh berikut, Anda meneruskan nilai untuk parameter ResourceGroupName dan DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Berikut adalah output sampel:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Membuat layanan tertaut Azure SQL Database

  1. Buat file JSON bernama AzureSQLDatabaseLinkedService.json di C:\ADFTutorials\IncCopyMultiTableTutorial dengan konten berikut. (Buat folder ADF jika belum ada.) Ganti <namaserver>, <nama database>, <nama pengguna>, dan <kata sandi> dengan nama SQL Server Anda, nama database, nama pengguna, dan kata sandi Anda sebelum Anda menyimpan file.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. Di Azure PowerShell, jalankan cmdlet Set-AzDataFactoryV2LinkedService untuk membuat layanan tertaut AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Berikut adalah output sampel:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Membuat himpunan data

Dalam langkah ini, Anda akan membuat himpunan data untuk mewakili sumber data, tujuan data, dan tempat untuk menyimpan marka air.

Membuat himpunan data sumber

  1. Buat file JSON bernama SourceDataset.json di folder yang sama dengan konten berikut:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Aktivitas Salin dalam alur menggunakan kueri SQL untuk memuat data daripada memuat seluruh tabel.

  2. Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data: SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Berikut adalah output sampel dari cmdlet tersebut:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Membuat himpunan data sink

  1. Buat file JSON bernama SinkDataset.json di folder yang sama dengan konten berikut. Elemen tableName diatur oleh alur secara dinamis pada waktu proses. Aktivitas ForEach dalam alur berulang melalui daftar nama tabel dan meneruskan nama tabel ke himpunan data ini dalam setiap perulangan.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Berikut adalah output sampel dari cmdlet tersebut:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Membuat himpunan data untuk marka air

Dalam langkah ini, Anda akan membuat himpunan data untuk menyimpan nilai marka air yang tinggi.

  1. Buat file JSON bernama WatermarkDataset.json di folder yang sama dengan konten berikut:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Jalankan cmdlet Set-AzDataFactoryV2Dataset untuk membuat himpunan data WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Berikut adalah output sampel dari cmdlet tersebut:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Buat alur

Alur mengambil daftar nama tabel sebagai parameter. Aktivitas ForEach berulang melalui daftar nama tabel dan melakukan operasi berikut:

  1. Gunakan aktivitas Pencarian untuk mengambil nilai marka air lama (nilai awal atau yang digunakan dalam perulangan terakhir).

  2. Gunakan aktivitas Pencarian untuk mengambil nilai marka air (nilai maksimum kolom marka air dalam tabel sumber).

  3. Gunakan aktivitas Salin untuk menyalin data di antara kedua nilai marka air ini dari database sumber ke database tujuan.

  4. Gunakan aktivitas StoredProcedure untuk memperbarui nilaimarka air lama yang akan digunakan pada langkah pertama perulangan berikutnya.

Membuat alur

  1. Buat file JSON bernama IncrementalCopyPipeline.json di folder yang sama dengan konten berikut:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Jalankan cmdlet Set-AzDataFactoryV2Pipeline untuk membuat alur IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Berikut adalah output sampel:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Menjalankan alur

  1. Buat file parameter bernama Parameters.json di folder yang sama dengan konten berikut:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Jalankan alur IncrementalCopyPipeline dengan menggunakan cmdlet Invoke-AzDataFactoryV2Pipeline. Ganti tempat penampung dengan grup sumber daya dan nama pabrik data Anda sendiri.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Monitor saluran pipa

  1. Masuk ke portal Azure.

  2. Pilih Semua layanan,cari dengan kata kunci Pabrik data, dan pilih Pabrik data.

  3. Cari pabrik data Anda di daftar pabrik data, dan pilih untuk membuka halaman Pabrik data.

  4. Pada halaman Pabrik data, pilih Buka di petak peta Buka Azure Data Factory Studio untuk meluncurkan Azure Data Factory di tab terpisah.

  5. Di beranda Azure Data Factory, pilih Monitor di sisi kiri.

    Screenshot shows the home page for Azure Data Factory.

  6. Anda dapat melihat semua alur berjalan dan statusnya. Perhatikan bahwa dalam contoh berikut, status eksekusi alur menjadi Berhasil. Anda dapat memeriksa parameter yang diteruskan ke alur dengan mengeklik tautan di kolom Parameter. Jika terjadi kesalahan, Anda akan melihat tautan di kolom Kesalahan.

    Screenshot shows pipeline runs for a data factory including your pipeline.

  7. Saat Anda memilih tautandi kolom Tindakan, Anda akan melihat semua aktivitas berjalan untuk alur.

  8. Untuk kembali ke tampilan Ekesekusi Alur, pilihSemua Eksekusi Alur.

Meninjau hasil

Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database SQL target untuk memverifikasi bahwa data disalin dari tabel sumber ke tabel tujuan:

Kueri

select * from customer_table

Hasil

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Kueri

select * from project_table

Hasil

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Kueri

select * from watermarktable

Hasil

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.

Menambahkan lebih banyak data ke tabel sumber

Jalankan kueri berikut ini terhadap database SQL Server sumber untuk memperbarui baris yang sudah ada di customer_table. Sisipkan baris baru ke dalam project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Menjalankan ulang alur

  1. Sekarang, jalankan ulang alur dengan menjalankan perintah PowerShell berikut:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Pantau eksekusi alur dengan mengikuti instruksi di bagian Pantau alur. Saat status alur Sedang berlangsung, Anda akan melihat tautan tindakan lain di bawah Tindaka untuk membatalkan eksekusi alur.

  3. Pilih Refresh untuk me-refresh daftar hingga alur berhasil.

  4. Secara opsional, pilih tautanLihat Aktivitas Berjalan di bawah Tindakan untuk melihat semua aktivitas yang terkait dengan eksekusi alur ini.

Meninjau hasil akhir

Di SQL Server Management Studio, jalankan kueri berikut ini terhadap database target untuk memverifikasi bahwa data yang telah diperbarui/baru telah disalin dari tabel sumber ke tabel tujuan.

Kueri

select * from customer_table

Hasil

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Perhatikan nilai baru dari Nama dan LastModifytime untuk PersonID untuk nomor 3.

Kueri

select * from project_table

Hasil

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Perhatikan bahwa entri NewProject telah ditambahkan ke project_table.

Kueri

select * from watermarktable

Hasil

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Perhatikan bahwa nilai marka air untuk kedua tabel telah diperbarui.

Anda melakukan langkah-langkah berikut dalam tutorial ini:

  • Siapkan penyimpanan data sumber dan tujuan.
  • Membuat pabrik data.
  • Membuat runtime integrasi yang dihost sendiri (IR).
  • Pasang runtime integrasi.
  • Membuat layanan tertaut.
  • Buat himpunan data sumber, sink, dan marka air.
  • Buat, jalankan, dan pantau alur.
  • Tinjau hasilnya.
  • Tambahkan atau perbarui data dalam tabel sumber.
  • Jalankan ulang dan pantau alur.
  • Tinjau hasil akhir.

Lanjutkan ke tutorial berikut untuk mempelajari tentang mengubah data dengan menggunakan kluster Spark di Azure: