Aracılığıyla paylaş


PowerShell kullanarak SQL Server'daki birden çok tablodan Azure SQL Veritabanı'na artımlı olarak veri yükleme

ŞUNLARA UYGULANIR: Azure Data Factory Azure Synapse Analytics

Tip

Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric veri hareketinden veri bilimine, gerçek zamanlı analizden iş zekasına ve raporlamaya kadar her şeyi kapsar. Ücretsiz olarak yeni bir deneme başlatmayı öğrenin!

Bu öğreticide, SQL Server veritabanındaki birden çok tablodan Azure SQL Veritabanı'na delta verileri yükleyen bir işlem hattı ile bir Azure Data Factory oluşturacaksınız.

Bu öğreticide aşağıdaki adımları gerçekleştireceksiniz:

  • Kaynak ve hedef veri depolarını hazırlayın.
  • Veri fabrikası oluşturma.
  • Kendi kendine barındırılan tümleştirme çalışma zamanı oluşturma.
  • Entegrasyon çalışma zamanını yükleyin.
  • Bağlı hizmet oluşturma.
  • Kaynak, havuz ve eşik veri kümeleri oluşturun.
  • Bir işlem hattı oluşturun, çalıştırın ve izleyin.
  • Sonuçları inceleyin.
  • Kaynak tablolarına veri ekleyin veya bu verileri güncelleştirin.
  • İşlem hattını tekrar çalıştırın ve takip edin.
  • Son sonuçları gözden geçirin.

Overview

Bu çözümü oluşturmak için önemli adımlar şunlardır:

  1. Filigran sütununu seçin.

    Kaynak veri deposundaki her tablo için bir sütun seçin. Bu sütun, her çalıştırma için yeni veya güncelleştirilmiş kayıtları tanımlayabilir. Normalde, satırlar oluşturulduğunda veya güncelleştirildiğinde seçilen bu sütundaki veriler (örneğin, last_modify_time veya kimlik) artmaya devam eder. Bu sütundaki en büyük değer eşik olarak kullanılır.

  2. Filigran değerini depolamak için bir veri deposu hazırlayın.

    Bu öğreticide, filigran değerini bir SQL veritabanında saklayacaksınız.

  3. Aşağıdaki eylemler ile bir işlem hattı oluşturun:

    1. İşlem hattına parametre olarak geçen kaynak tablosu adlarının bir listesi üzerinden yinelenen bir ForEach eylemi oluşturun. Her kaynak tablosu için, o tabloya yönelik delta yüklemesini gerçekleştirmek amacıyla aşağıdaki faaliyetleri başlatır.

    2. İki arama etkinliği oluşturun. Son eşik değerini almak için ilk Arama etkinliğini kullanın. Yeni eşik değerini almak için ikinci Arama etkinliğini kullanın. Bu filigran değerleri Kopyalama etkinliğine aktarılır.

    3. Kaynak veri deposundaki satırları, filigran sütununun değeri eski filigran değerinden büyük ve yeni filigran değerinden küçük veya buna eşit olan bir Kopyalama etkinliği oluşturun. Ardından, delta veriler kaynak veri deposundan Azure Blob depolama alanına yeni bir dosya olarak kopyalanır.

    4. Sonraki seferde çalışan işlem hattı için eşik değerini güncelleştiren bir StoredProcedure etkinliği oluşturun.

    Yüksek düzeyli çözüm diyagramı aşağıdaki gibidir:

    Artımlı olarak veri yükleme

Azure aboneliğiniz yoksa, başlamadan önce bir ücretsiz hesap oluşturun.

Prerequisites

  • SQL Sunucusu. Bu öğreticide kaynak veri deposu olarak bir SQL Server veritabanı kullanırsınız.
  • Azure SQL Veritabanı. havuz veri deposu olarak Azure SQL Veritabanı'da bir veritabanı kullanırsınız. SQL veritabanınız yoksa oluşturma adımları için bkz. Azure SQL Veritabanı'nda veritabanı oluşturma .

SQL Server veritabanınızda kaynak tabloları oluşturma

  1. SQL Server Management Studio (SSMS) veya Azure Data Studio'yu açın ve SQL Server veritabanınıza bağlanın.

  2. Sunucu Gezgini'nde (SSMS) veya Bağlantılar bölmesinde (Azure Data Studio) veritabanına sağ tıklayın ve Yeni Sorgu'yu seçin.

  3. customer_table ve project_table adlı tabloları oluşturmak için aşağıdaki SQL komutunu veritabanınızda çalıştırın:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Azure SQL Veritabanınızda hedef tablolar oluşturma

  1. SQL Server Management Studio (SSMS) veya Azure Data Studio'yu açın ve SQL Server veritabanınıza bağlanın.

  2. Sunucu Gezgini'nde (SSMS) veya Bağlantılar bölmesinde (Azure Data Studio) veritabanına sağ tıklayın ve Yeni Sorgu'yu seçin.

  3. customer_table ve project_table adlı tabloları oluşturmak için aşağıdaki SQL komutunu veritabanınızda çalıştırın:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Yüksek filigran değerini depolamak için Azure SQL Veritabanı'nda başka bir tablo oluşturma

  1. Filigran değerini depolamak için adlı watermarktable bir tablo oluşturmak için veritabanınızda aşağıdaki SQL komutunu çalıştırın:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Her iki kaynak tablonun ilk eşik değerlerini eşik tablosuna ekleyin.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Azure SQL Veritabanı'nda saklı yordam oluşturma

Veritabanınızda bir saklı yordam oluşturmak için aşağıdaki komutu çalıştırın. Bu saklı yordam, her işlem hattı çalıştırılması sonrasında filigran değerini günceller.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Azure SQL Veritabanı'nda veri türleri ve ek saklı yordamlar oluşturma

Veritabanınızda iki saklı yordam ve iki veri türü oluşturmak için aşağıdaki sorguyu çalıştırın. Bunlar, kaynak tablodaki verileri hedef tablolarla birleştirmek için kullanılır.

Başlangıçtaki süreci kolaylaştırmak için, delta verilerini bir tablo değişkeni kullanarak içeri aktaran bu Saklı Yordamları doğrudan kullanırız ve ardından bunları hedef veri deposuna entegre ederiz. Tablo değişkeninde "büyük" sayıda delta satırının (100'den fazla) depolanmasını beklemiyorum, bu yüzden dikkatli olun.

Çok sayıda delta satırını hedef depoyla birleştirmeniz gerekiyorsa, kopyalama etkinliğini kullanarak tüm delta verilerini önce hedef depodaki geçici bir "hazırlama" tablosuna kopyalamanızı ve ardından bunları "hazırlama" tablosundan "son" tabloya birleştirmek için tablo değişkeni kullanmadan kendi saklı yordamınızı oluşturmanızı öneririz.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Azure PowerShell'i yükleme ve yapılandırma başlığındaki yönergeleri izleyerek en son Azure PowerShell modüllerini yükleyin.

Veri fabrikası oluşturma

  1. Daha sonra PowerShell komutlarında kullanacağınız kaynak grubu adı için bir değişken tanımlayın. Aşağıdaki komut metnini PowerShell'e kopyalayın Azure kaynak grubu için tırnak işaretleri içinde bir ad belirtin ve ardından komutu çalıştırın. "adfrg" bunun bir örneğidir.

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Kaynak grubu zaten varsa, bunun üzerine yazmak istemeyebilirsiniz. $resourceGroupName değişkenine farklı bir değer atayın ve komutu yeniden çalıştırın.

  2. Veri fabrikasının konumu için bir değişken tanımlayın.

    $location = "East US"
    
  3. Azure kaynak grubunu oluşturmak için aşağıdaki komutu çalıştırın:

    New-AzResourceGroup $resourceGroupName $location
    

    Kaynak grubu zaten varsa, bunun üzerine yazmak istemeyebilirsiniz. $resourceGroupName değişkenine farklı bir değer atayın ve komutu yeniden çalıştırın.

  4. Veri fabrikasının adı için bir değişken tanımlayın.

    Important

    Veri fabrikasının adını genel olarak benzersiz olacak şekilde güncelleştirin. ADFIncMultiCopyTutorialFactorySP1127 buna bir örnektir.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Veri fabrikasını oluşturmak için aşağıdaki Set-AzDataFactoryV2 cmdlet'ini çalıştırın:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Aşağıdaki noktaları not edin:

  • Veri fabrikasının adı genel olarak benzersiz olmalıdır. Aşağıdaki hata iletisini alırsanız adı değiştirip yeniden deneyin:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Data Factory örnekleri oluşturmak için, Azure’da oturum açarken kullandığınız kullanıcı hesabı, katkıda bulunan veya sahip rollerinin üyesi ya da bir Azure aboneliğinin yöneticisi olmalıdır.

  • Data Factory'nin kullanılabileceği Azure bölgelerinin bir listesi için bir sonraki sayfada ilgilendiğiniz bölgeleri seçin ve Analytics'i genişleterek Data Factory: Products available by region (Bölgeye göre kullanılabilir durumdaki ürünler) bölümünü bulun. Veri fabrikası tarafından kullanılan veri depoları (Azure Depolama, SQL Veritabanı, SQL Yönetilen Örneği vb.) ve işlem (Azure HDInsight vb.) başka bölgelerde olabilir.

Şirket içinde barındırılan tümleştirme çalışma zamanı oluşturma

Bu bölümde, şirket içinde barındırılan bir tümleştirme çalışma zamanı oluşturur ve SQL Server veritabanını içeren bir şirket içi makine ile ilişkilendirirsiniz. Yerel olarak barındırılan entegrasyon çalışma zamanı, makinenizdeki SQL Server'dan Azure SQL Veritabanı'na veri kopyalayan bileşendir.

  1. Tümleştirme çalışma zamanının adı için bir değişken oluşturun. Benzersiz bir ad kullanın ve not edin. Daha sonra bu öğreticide kullanacaksınız.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Kendi kendine barındırılan tümleştirme çalışma zamanı oluşturma.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Örnek çıktı aşağıdaki gibidir:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Oluşturulan tümleştirme çalışma zamanının durumunu almak için aşağıdaki komutu çalıştırın. State özelliğinin değerinin NeedRegistration olarak ayarlandığını onaylayın.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Örnek çıktı aşağıdaki gibidir:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Bulutta Azure Data Factory hizmetine kendi barındırdığı entegrasyon çalışma zamanını kaydetmek için kullanılan kimlik doğrulama anahtarlarını almak üzere aşağıdaki komutu çalıştırın:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Örnek çıktı aşağıdaki gibidir:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Kendi kendine barındırılan tümleştirme çalışma zamanını makinenize yüklerken, kaydetmek için kullanılan anahtarlardan birini (çift tırnak işaretleri olmadan) kopyalayın.

Tümleştirme çalışma zamanı aracını yükleme

  1. Makinenizde tümleştirme çalışma zamanı zaten varsa, Program Ekle veya Kaldır'ı kullanarak kaldırın.

  2. Kendi içinde barındırılan entegrasyon çalışma zamanını yerel bir Windows makinesine indirin. Yüklemeyi çalıştırın.

  3. Microsoft Integration Runtime Kurulumuna Hoş Geldiniz sayfasında İleri'yi seçin.

  4. End-User Lisans Sözleşmesi sayfasında koşulları ve lisans sözleşmesini kabul edin ve İleri'yi seçin.

  5. Hedef Klasör sayfasında İleri'yi seçin.

  6. Microsoft Integration Runtime'ı yüklemeye hazır sayfasında Yükle'yi seçin.

  7. Microsoft Integration Runtime Kurulumu Tamamlandı sayfasında Son'u seçin.

  8. Entegrasyon Çalışma Zamanını Kaydetme (Yerel barındırılan) sayfasında, önceki bölümde kaydettiğiniz anahtarı yapıştırın ve Kaydetme'yi seçin.

    Tümleştirme çalışma zamanını kaydetme

  9. Yeni Tümleştirme Çalışma Zamanı (Şirket içinde barındırılan) Düğüm sayfasında Bitir'i seçin.

  10. Yerel barındırılan tümleştirme çalışma zamanı başarıyla kaydedildiğinde aşağıdaki iletiyi görürsünüz:

    Başarıyla kaydedildi

  11. Yerel Sunumlu Tümleştirme Çalışma Zamanını Kaydet sayfasında Başlat Configuration Manager'ı seçin.

  12. Düğüm bulut hizmetine bağlandığında aşağıdaki sayfayı görürsünüz:

    Düğüm bağlı sayfa

  13. Şimdi SQL Server veritabanınıza bağlantıyı test edin.

    Tanılama sekmesi

    a. Configuration Manager sayfasında Tanılama sekmesine gidin.

    b. Veri kaynağı türü için SqlServer'ı seçin.

    c. Sunucu adını girin.

    d. Veritabanı adını girin.

    e. Kimlik doğrulama modunu seçin.

    f. Kullanıcı adını girin.

    g. Kullanıcı adı için ilişkili parolayı girin.

    h. Tümleştirme çalışma zamanının SQL Server'a bağlanabildiğini onaylamak için Test'i seçin. Bağlantı başarılı olursa yeşil bir onay işareti görürsünüz. Bağlantı başarılı olmazsa bir hata iletisi görürsünüz. Tüm sorunları düzeltin ve tümleştirme çalışma zamanının SQL Server'a bağlanadığından emin olun.

    Note

    Kimlik doğrulama türü, sunucu, veritabanı, kullanıcı ve parola değerlerini not edin. Bunları bu öğreticinin ilerleyen bölümlerinde kullanacaksınız.

Bağlı hizmetler oluşturma

Veri depolarınızı ve işlem hizmetlerinizi veri fabrikasına bağlamak için veri fabrikasında bağlı hizmetler oluşturursunuz. Bu bölümde, Azure SQL Veritabanı'da SQL Server veritabanınıza ve veritabanınıza bağlı hizmetler oluşturacaksınız.

SQL Server bağlı hizmet oluşturma

Bu adımda SQL Server veritabanınızı veri fabrikasına bağlarsınız.

  1. C:\ADFTutorials\IncCopyMultiTableTutorial klasöründe (henüz yoksa yerel klasörleri oluşturun) aşağıdaki içeriğe sahip SqlServerLinkedService.json adlı bir JSON dosyası oluşturun. SQL Server'a bağlanmak için kullandığınız kimlik doğrulamasına göre doğru bölümü seçin.

    Important

    SQL Server'a bağlanmak için kullandığınız kimlik doğrulamasına göre doğru bölümü seçin.

    SQL kimlik doğrulaması kullanıyorsanız aşağıdaki JSON tanımını kopyalayın:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Windows kimlik doğrulaması kullanıyorsanız aşağıdaki JSON tanımını kopyalayın:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Important

    • SQL Server'a bağlanmak için kullandığınız kimlik doğrulamasına göre doğru bölümü seçin.
    • Tümleştirme çalışma zamanı adını< tümleştirme çalışma zamanınızın adıyla değiştirin>.
    • Dosyayı kaydetmeden önce sunucu adı<, veritabanı adı>, <kullanıcı adı> ve <parola> değerlerini SQL Server veritabanınızın değerleriyle değiştirin<>.
    • Kullanıcı hesabınızda veya sunucu adında eğik çizgi karakteri (\) kullanmanız gerekirse kaçış karakterini (\) kullanın. mydomain\\myuser bunun bir örneğidir.
  2. PowerShell'de, C:\ADFTutorials\IncCopyMultiTableTutorial klasörüne geçmek için aşağıdaki cmdlet'i çalıştırın.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. AzureStorageLinkedService bağlı hizmetini oluşturmak için Set-AzDataFactoryV2LinkedService cmdlet'ini çalıştırın. Aşağıdaki örnekte, ResourceGroupName ve DataFactoryName parametrelerinin değerlerini geçirirsiniz:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Örnek çıktı aşağıdaki gibidir:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

SQL Veritabanı bağlı hizmetini oluştur

  1. C:\ADFTutorials\IncCopyMultiTableTutorial klasöründe aşağıdaki içeriğe sahip AzureSQLDatabaseLinkedService.json adlı bir JSON dosyası oluşturun. (Henüz yoksa ADF klasörünü oluşturun.) Dosyayı kaydetmeden önce sunucu adı<, veritabanı adı>, <kullanıcı adı> ve <parolayı> SQL Server veritabanınızın adı, veritabanınızın adı, kullanıcı adı ve parola ile değiştirin<>.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. PowerShell'de Set-AzDataFactoryV2LinkedService cmdlet'ini çalıştırarak AzureSQLDatabaseLinkedService bağlı hizmetini oluşturun.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Örnek çıktı aşağıdaki gibidir:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Veri kümeleri oluşturma

Bu adımda veri kaynağı, veri hedefi ve eşiğin depolanacağı yeri temsil eden veri kümeleri oluşturacaksınız.

Kaynak veri kümesi oluşturma

  1. Aynı klasörde aşağıdaki içeriğe sahip SourceDataset.json adlı bir JSON dosyası oluşturun:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    İşlem hattındaki Kopyalama etkinliği, tüm tabloyu yüklemek yerine verileri yüklemek için bir SQL sorgusu kullanır.

  2. SourceDataset veri kümesini oluşturmak için Set-AzDataFactoryV2Dataset cmdlet'ini çalıştırın.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Cmdlet’in örnek çıktısı aşağıdaki gibidir:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Lavabo veri kümesi oluşturma

  1. Aşağıdaki içeriğe sahip aynı klasörde SinkDataset.json adlı bir JSON dosyası oluşturun. tableName öğesi, işlem hattı tarafından çalışma zamanında dinamik olarak ayarlanır. İşlem hattındaki ForEach etkinliği, tablo adlarının bir listesi üzerinden yinelenir ve her yinelemede tablo adını bu veri kümesine geçirir.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. SinkDataset veri kümesini oluşturmak için Set-AzDataFactoryV2Dataset cmdlet'ini çalıştırın.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Cmdlet’in örnek çıktısı aşağıdaki gibidir:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Eşik için veri kümesi oluşturma

Bu adımda üst eşik değerini depolamak için bir veri kümesi oluşturacaksınız.

  1. Aynı klasörde aşağıdaki içeriğe sahip WatermarkDataset.json adlı bir JSON dosyası oluşturun:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. WatermarkDataset veri kümesini oluşturmak için Set-AzDataFactoryV2Dataset cmdlet'ini çalıştırın.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Cmdlet’in örnek çıktısı aşağıdaki gibidir:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Boru hattı oluştur

Bu işlem hattı parametre olarak tablo adları listesini alır. ForEach etkinliği, tablo adları listesinde yinelenir ve aşağıdaki işlemleri gerçekleştirir:

  1. Eski filigran değerini (ilk değer veya son yinelemede kullanılan değer) almak için Arama etkinliğini kullanın.

  2. Yeni filigran değerini (kaynak tablodaki filigran sütununun en büyük değeri) almak için Arama etkinliğini kullanın.

  3. Bu iki filigran değeri arasındaki verileri kaynak veritabanından hedef veritabanına kopyalamak için Kopyalama etkinliğini kullanın.

  4. Bir sonraki yinelemenin ilk adımında kullanılacak eski filigran değerini güncelleştirmek için StoredProcedure etkinliğini kullanın.

İşlem hattını oluştur

  1. Aynı klasörde aşağıdaki içeriğe sahip IncrementalCopyPipeline.json adlı bir JSON dosyası oluşturun:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. IncrementalCopyPipeline işlem hattını oluşturmak için Set-AzDataFactoryV2Pipeline cmdlet'ini çalıştırın.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Örnek çıktı aşağıdaki gibidir:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

İşlem hattını çalıştırma

  1. Aynı klasörde aşağıdaki içeriğe sahip Parameters.json adlı bir parametre dosyası oluşturun:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Invoke-AzDataFactoryV2Pipeline cmdlet'ini kullanarak IncrementalCopyPipeline işlem hattını çalıştırın. Yer tutucuları kendi kaynak grubu ve veri fabrikası adınızla değiştirin.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

İşlem hattını izleme

  1. Azure portalınaoturum açın.

  2. Tüm hizmetler'i seçin, Veri fabrikaları anahtar sözcüğüyle arama yapın ve Veri fabrikaları'nı seçin.

  3. Veri fabrikaları listesinde veri fabrikanızı arayın ve Data factory sayfasını açmak için bu fabrikayı seçin.

  4. Data factory sayfasında, Azure Data Factory'yi ayrı bir sekmede başlatmak için Azure Data Factory Studio'yu Aç kutucuğunda Aç'ı seçin.

  5. Azure Data Factory giriş sayfasında sol taraftaki İzleyici'yi seçin.

    Azure Data Factory giriş sayfasını gösteren ekran görüntüsü.

  6. Tüm işlem hattı çalıştırmalarını ve bunların durumunu görebilirsiniz. Aşağıdaki örnekte işlem hattı çalıştırmasının durumunun Başarılı olarak belirtildiğini görebilirsiniz. İşlem hattına geçirilen parametreleri denetlemek için Parametreler sütunundaki bağlantıyı seçin. Bir hata oluştuysa, Hata sütununda bir bağlantı görürsünüz.

    İşlem hattınız da dahil olmak üzere bir veri fabrikası için işlem hattı çalıştırmalarını gösteren ekran görüntüsü.

  7. Eylemler sütununda bağlantıyı seçtiğinizde işlem hattı için tüm aktivite yürütmelerini görürsünüz.

  8. İşlem Hattı Çalıştırmaları görünümüne dönmek için Tüm İşlem Hattı Çalıştırmaları'nı seçin.

Sonuçları gözden geçirin

Verilerin kaynak tablolardan hedef tablolara kopyalandığını doğrulamak için, SQL Server Management Studio’da SQL veritabanında aşağıdaki sorguları çalıştırın:

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Her iki tablonun da eşik değerlerinin güncelleştirildiğine dikkat edin.

Kaynak tablolara daha fazla veri ekleme

customer_table içerisindeki mevcut bir satırı güncelleştirmek için aşağıdaki sorguyu kaynak SQL Server veritabanında çalıştırın. Project_table içine yeni bir satır ekleyin.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

İşlem hattını yeniden çalıştırma

  1. Şimdi aşağıdaki PowerShell komutunu yürüterek işlem hattını yeniden çalıştırın:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. İşlem hattını izleme bölümündeki yönergeleri izleyerek işlem hattı çalıştırmalarını izleyin . İşlem hattı durumu Devam Ediyor olduğunda, İşlem hattı çalıştırmasını iptal etmek için Eylemler altında başka bir eylem bağlantısı görürsünüz.

  3. İşlem hattı çalıştırması başarılı olana kadar listeyi yenilemek için Yenile'yi seçin.

  4. İsteğe bağlı olarak, bu işlem hattı çalıştırmasıyla ilişkili tüm etkinlik çalıştırmalarını görmek için Eylemler altındaki Etkinlik Çalıştırmalarını Görüntüle bağlantısını seçin.

Son sonuçları gözden geçirme

SQL Server Management Studio'da, güncelleştirilmiş/yeni verilerin kaynak tablolardan hedef tablolara kopyalandığını doğrulamak için hedef veritabanında aşağıdaki sorguları çalıştırın.

Query

select * from customer_table

Output

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

3 numaraya ait PersonID için yeni Name ve LastModifytime değerlerine dikkat edin.

Query

select * from project_table

Output

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

NewProject girişinin project_table tablosuna eklendiğine dikkat edin.

Query

select * from watermarktable

Output

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Her iki tablonun da eşik değerlerinin güncelleştirildiğine dikkat edin.

Bu öğreticide aşağıdaki adımları gerçekleştirdiniz:

  • Kaynak ve hedef veri depolarını hazırlayın.
  • Veri fabrikası oluşturma.
  • Kendi kendine barındırılan tümleştirme çalışma zamanı (IR) oluşturun.
  • Entegrasyon çalışma zamanını yükleyin.
  • Bağlı hizmet oluşturma.
  • Kaynak, havuz ve eşik veri kümeleri oluşturun.
  • Bir işlem hattı oluşturun, çalıştırın ve izleyin.
  • Sonuçları inceleyin.
  • Kaynak tablolarına veri ekleyin veya bu verileri güncelleştirin.
  • İşlem hattını tekrar çalıştırın ve takip edin.
  • Son sonuçları gözden geçirin.

Azure üzerinde bir Spark kümesi kullanarak veri dönüştürme hakkında bilgi edinmek için aşağıdaki öğreticiye geçin: